In [1]:
import findspark
# /opt/manual/spark: this is SPARK_HOME path
findspark.init("/opt/manual/spark")
from pyspark.sql import SparkSession,functions as F 

In [2]:
spark = (
    SparkSession.builder
    .appName("Taxi Elasticsearch")
    .master("local[2]")
    .config("spark.sql.shuffle.partitions", 4)
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .config("spark.jars.packages", "org.elasticsearch:elasticsearch-spark-30_2.12:7.12.1") 
    .getOrCreate()
)



:: loading settings :: url = jar:file:/opt/manual/spark-3.1.1-bin-hadoop3.2/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/train/.ivy2/cache
The jars for the packages stored in: /home/train/.ivy2/jars
org.elasticsearch#elasticsearch-spark-30_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-fc6320fd-fd88-44fa-90f8-ce4a7c57e207;1.0
	confs: [default]
	found org.elasticsearch#elasticsearch-spark-30_2.12;7.12.1 in central
	found org.scala-lang#scala-reflect;2.12.8 in central
	found org.slf4j#slf4j-api;1.7.6 in local-m2-cache
	found commons-logging#commons-logging;1.1.1 in local-m2-cache
	found javax.xml.bind#jaxb-api;2.3.1 in central
	found com.google.protobuf#protobuf-java;2.5.0 in local-m2-cache
	found org.apache.spark#spark-yarn_2.12;3.0.1 in central
:: resolution report :: resolve 3100ms :: artifacts dl 53ms
	:: modules in use:
	com.google.protobuf#protobuf-java;2.5.0 from local-m2-cache in [default]
	commons-logging#commons-logging;1.1.1 from local-m2-cache in [default]
	javax.xml.bind#jaxb-api;2.3.1 from central in [default]
	org.

In [3]:
df = spark.read.option("header", True) \
    .option("inferSchema", True) \
    .csv("file:///home/train/datasets/nyc_taxi.csv")
df.limit(5).toPandas()

                                                                                

Unnamed: 0,id,vendor_id,pickup_datetime,dropoff_datetime,passenger_count,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,store_and_fwd_flag,trip_duration
0,id2875421,2,2016-03-14 17:24:55,2016-03-14 17:32:30,1,-73.982155,40.767937,-73.96463,40.765602,N,455
1,id2377394,1,2016-06-12 00:43:35,2016-06-12 00:54:38,1,-73.980415,40.738564,-73.999481,40.731152,N,663
2,id3858529,2,2016-01-19 11:35:24,2016-01-19 12:10:48,1,-73.979027,40.763939,-74.005333,40.710087,N,2124
3,id3504673,2,2016-04-06 19:32:31,2016-04-06 19:39:40,1,-74.01004,40.719971,-74.012268,40.706718,N,429
4,id2181028,2,2016-03-26 13:30:55,2016-03-26 13:38:10,1,-73.973053,40.793209,-73.972923,40.78252,N,435


In [4]:
df.printSchema()

root
 |-- id: string (nullable = true)
 |-- vendor_id: integer (nullable = true)
 |-- pickup_datetime: string (nullable = true)
 |-- dropoff_datetime: string (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- trip_duration: integer (nullable = true)



In [18]:
subset = df.limit(2000)

subset.write \
.format("csv") \
.mode("overwrite") \
.save("file:///home/train/datasets/taxi_subset_train.csv")

                                                                                

In [5]:
from pyspark.sql.types import StringType, FloatType
from math import radians, cos, sin, asin, sqrt

def switch_tr_day(day_index):
    my_dict = {
        1: 'Pazar',
        2: 'Pazartesi',
        3: 'Salı',
        4: 'Çarşamba',
        5: 'Perşembe',
        6: 'Cuma',
        7: 'Cumartesi'
    }
    
    return my_dict.get(day_index)

def switch_month_day(month_index):
    my_dict = {
        1: 'Ocak',
        2: 'Subat',
        3: 'Mart',
        4: 'Nisan',
        5: 'Mayis',
        6: 'Haziran',
        7: 'Temmuz',
        8: 'Agustos',
        9: 'Eylul',
        10: 'Ekim',
        11: 'Kasim',
        12: 'Aralik'
    }
    
    return my_dict.get(month_index)

def haversine(lon1, lat1, lon2, lat2):
    """
    Calculate the great circle distance in kilometers between two points 
    on the earth (specified in decimal degrees)
    """
    # convert decimal degrees to radians 
    lon1, lat1, lon2, lat2 = map(radians, [lon1, lat1, lon2, lat2])

    # haversine formula 
    dlon = lon2 - lon1 
    dlat = lat2 - lat1 
    a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
    c = 2 * asin(sqrt(a)) 
    r = 6371 # Radius of earth in kilometers. Use 3956 for miles. Determines return value units.
    return c * r

In [6]:
haversine_distance = F.udf(lambda lon1, lat1, lon2, lat2: haversine(lon1, lat1, lon2, lat2), FloatType())
spark.udf.register("haversine_distance", haversine_distance)

switch_month = F.udf(lambda z: switch_month_day(z), StringType())
spark.udf.register("switch_month", switch_month)

switch_tr = F.udf(lambda z: switch_tr_day(z), StringType())
spark.udf.register("switch_tr", switch_tr)

<function __main__.<lambda>(z)>

In [7]:
df2 = df.withColumn("pickup_datetime",
                                   F.to_timestamp(F.col("pickup_datetime"), "yyyy-MM-dd HH:mm:ss")) \
            .withColumn("dropoff_datetime",
                        F.to_timestamp(F.col("dropoff_datetime"), "yyyy-MM-dd HH:mm:ss"))

In [8]:
df3 = df2.withColumn("pickup_year",
                        F.year(F.to_date(F.col("pickup_datetime")))) \
            .withColumn("pickup_month",
                        F.month(F.to_date(F.col("pickup_datetime")))) \
            .withColumn("pickup_dayofweek",
                        F.dayofweek(F.to_date(F.col("pickup_datetime")))) \
            .withColumn("pickup_hour",
                        F.hour(F.col("pickup_datetime"))) \
            .withColumn("pickupDayofWeek_TR",
                        switch_tr(F.col("pickup_dayofweek"))) \
            .withColumn("pickupMonth_TR",
                        switch_month(F.col("pickup_month"))) \
            .withColumn("haversine_distance(km)",
                        haversine_distance(F.col("pickup_longitude"), F.col("pickup_latitude"),
                                           F.col("dropoff_longitude"),
                                           F.col("dropoff_latitude"))) \
            .withColumn("travel_speed", 
                        1000 * F.col("haversine_distance(km)") / F.col("trip_duration")) \
            .drop("pickup_datetime", "dropoff_datetime")

In [80]:
df2.show(5)

+---------+---------+-------------------+-------------------+---------------+------------------+------------------+------------------+------------------+------------------+-------------+
|       id|vendor_id|    pickup_datetime|   dropoff_datetime|passenger_count|  pickup_longitude|   pickup_latitude| dropoff_longitude|  dropoff_latitude|store_and_fwd_flag|trip_duration|
+---------+---------+-------------------+-------------------+---------------+------------------+------------------+------------------+------------------+------------------+-------------+
|id2875421|        2|2016-03-14 17:24:55|2016-03-14 17:32:30|              1| -73.9821548461914| 40.76793670654297|-73.96463012695312|40.765602111816406|                 N|          455|
|id2377394|        1|2016-06-12 00:43:35|2016-06-12 00:54:38|              1|-73.98041534423828|40.738563537597656|-73.99948120117188| 40.73115158081055|                 N|          663|
|id3858529|        2|2016-01-19 11:35:24|2016-01-19 12:10:48|    

In [9]:
df3.limit(5).toPandas()

Traceback (most recent call last):                                  (0 + 1) / 1]
  File "/opt/manual/spark/python/lib/pyspark.zip/pyspark/daemon.py", line 186, in manager
  File "/opt/manual/spark/python/lib/pyspark.zip/pyspark/daemon.py", line 74, in worker
  File "/opt/manual/spark/python/lib/pyspark.zip/pyspark/worker.py", line 643, in main
    if read_int(infile) == SpecialLengths.END_OF_STREAM:
  File "/opt/manual/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 564, in read_int
    raise EOFError
EOFError
                                                                                

Unnamed: 0,id,vendor_id,passenger_count,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,store_and_fwd_flag,trip_duration,pickup_year,pickup_month,pickup_dayofweek,pickup_hour,pickupDayofWeek_TR,pickupMonth_TR,haversine_distance(km),travel_speed
0,id2875421,2,1,-73.982155,40.767937,-73.96463,40.765602,N,455,2016,3,2,17,Pazartesi,Mart,1.498521,3.293452
1,id2377394,1,1,-73.980415,40.738564,-73.999481,40.731152,N,663,2016,6,1,0,Pazar,Haziran,1.805507,2.723239
2,id3858529,2,1,-73.979027,40.763939,-74.005333,40.710087,N,2124,2016,1,3,11,Salı,Ocak,6.385098,3.006167
3,id3504673,2,1,-74.01004,40.719971,-74.012268,40.706718,N,429,2016,4,4,19,Çarşamba,Nisan,1.485498,3.4627
4,id2181028,2,1,-73.973053,40.793209,-73.972923,40.78252,N,435,2016,3,7,13,Cumartesi,Mart,1.188589,2.732387


In [10]:
df3.printSchema()

root
 |-- id: string (nullable = true)
 |-- vendor_id: integer (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- trip_duration: integer (nullable = true)
 |-- pickup_year: integer (nullable = true)
 |-- pickup_month: integer (nullable = true)
 |-- pickup_dayofweek: integer (nullable = true)
 |-- pickup_hour: integer (nullable = true)
 |-- pickupDayofWeek_TR: string (nullable = true)
 |-- pickupMonth_TR: string (nullable = true)
 |-- haversine_distance(km): float (nullable = true)
 |-- travel_speed: double (nullable = true)



In [11]:
taxi_subset = df3.limit(1000)

In [12]:
taxi_subset.printSchema()

root
 |-- id: string (nullable = true)
 |-- vendor_id: integer (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- trip_duration: integer (nullable = true)
 |-- pickup_year: integer (nullable = true)
 |-- pickup_month: integer (nullable = true)
 |-- pickup_dayofweek: integer (nullable = true)
 |-- pickup_hour: integer (nullable = true)
 |-- pickupDayofWeek_TR: string (nullable = true)
 |-- pickupMonth_TR: string (nullable = true)
 |-- haversine_distance(km): float (nullable = true)
 |-- travel_speed: double (nullable = true)



In [13]:
nyc_taxi_index =  {
  "settings": {
    "index": {
      "analysis": {
        "analyzer": {
          "custom_analyzer":
          {
            "type":"custom",
            "tokenizer":"standard",
            "filter":[
              "lowercase", "custom_edge_ngram","asciifolding"
            ]
          }
        },
        "filter": {
          "custom_edge_ngram": {
            "type": "edge_ngram",
            "min_gram":2,
            "max_gram": 20
            }
          }
        }
      }
    },
    "mappings": {
    "properties": {
      "id": { "type": "text" },  
      "vendor_id":  { "type": "integer"  },
      "passenger_count": {"type":   "integer"},
      "pickup_longitude": {"type": "double"},
      "pickup_latitude": {"type": "double"},
      "pickup_location": { "type": "geo_point" },
      "dropoff_longitude": {"type": "double"},
      "dropoff_latitude": {"type": "double"},
      "store_and_fwd_flag":    { "type": "keyword" },
      "trip_duration":  { "type": "integer"  }, 
      "pickup_year":   { "type": "integer"  },
      "pickup_month": {"type": "integer"},
      "pickup_dayofweek": {"type":   "integer"},
      "pickup_hour": {"type":   "integer"},
      "pickupDayofWeek_TR": {"type": "keyword"},
      "pickupMonth_TR": {"type": "keyword"},
      "haversine_distance(km)": {"type": "float"},
      "travel_speed": {"type": "double"}
    }
  }
  }

In [14]:
from elasticsearch import Elasticsearch, helpers

es = Elasticsearch()


try:
    es.indices.delete("nyc-taxi-spark")
    print("nyc-taxi-spark  index deleted.")
except:
    print("No index")

No index


  es.indices.delete("nyc-taxi-spark")


In [15]:
es.indices.create("nyc-taxi-spark", body=nyc_taxi_index)

  es.indices.create("nyc-taxi-spark", body=nyc_taxi_index)
  es.indices.create("nyc-taxi-spark", body=nyc_taxi_index)


{'acknowledged': True, 'shards_acknowledged': True, 'index': 'nyc-taxi-spark'}

In [16]:
import time
start_time = time.time()

taxi_subset.write \
    .format("org.elasticsearch.spark.sql") \
    .mode("append") \
    .option("es.nodes", "localhost") \
    .option("es.port","9200") \
    .save("nyc-taxi-spark")


print("----- %s secs -----" %(time.time() - start_time))

Traceback (most recent call last):                                  (0 + 2) / 2]
  File "/opt/manual/spark/python/lib/pyspark.zip/pyspark/daemon.py", line 186, in manager
  File "/opt/manual/spark/python/lib/pyspark.zip/pyspark/daemon.py", line 74, in worker
  File "/opt/manual/spark/python/lib/pyspark.zip/pyspark/worker.py", line 643, in main
    if read_int(infile) == SpecialLengths.END_OF_STREAM:
  File "/opt/manual/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 564, in read_int
    raise EOFError
EOFError
[Stage 5:>                                                          (0 + 1) / 1]

----- 14.80552363395691 secs -----


                                                                                

In [91]:
spark.stop()