In [1]:
import findspark
findspark.init()

In [2]:
import csv
import pandas as pd
import numpy as np
from pyspark import SparkConf,SparkContext
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from datetime import datetime
import pyspark.sql.functions as func


In [3]:
spark = SparkSession.builder.config("master", "local[4]").getOrCreate()




In [4]:
df=spark.read.csv(r'data.csv',header=True,inferSchema=True)

In [6]:
df.show()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+---+--------+----+-------------------+------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|day|week_day|hour|               date|          duration|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+---+--------+----+-------------------+------------------+
|       2| 2019-12-02 10:09:59|  2019-12-02 10:50:00

In [7]:
df.dtypes

[('VendorID', 'int'),
 ('tpep_pickup_datetime', 'timestamp'),
 ('tpep_dropoff_datetime', 'timestamp'),
 ('passenger_count', 'int'),
 ('trip_distance', 'double'),
 ('RatecodeID', 'int'),
 ('store_and_fwd_flag', 'string'),
 ('PULocationID', 'int'),
 ('DOLocationID', 'int'),
 ('payment_type', 'int'),
 ('fare_amount', 'double'),
 ('extra', 'double'),
 ('mta_tax', 'double'),
 ('tip_amount', 'double'),
 ('tolls_amount', 'int'),
 ('improvement_surcharge', 'double'),
 ('total_amount', 'double'),
 ('congestion_surcharge', 'double'),
 ('day', 'int'),
 ('week_day', 'int'),
 ('hour', 'int'),
 ('date', 'timestamp'),
 ('duration', 'double')]

In [5]:
def set_ID(a,b):
    if a>=b:
        return 1000*b+a
    else :
        return 1000*a+b

In [6]:
df1=df.select('trip_distance','PULocationID','DOLocationID','day','week_day','hour','duration')


+-------------+------------+------------+---+--------+----+------------------+
|trip_distance|PULocationID|DOLocationID|day|week_day|hour|          duration|
+-------------+------------+------------+---+--------+----+------------------+
|        17.67|         233|           1|  2|       2|  10|40.016666666666666|
|        17.92|          48|           1|  4|       4|   5|              32.0|
|        15.57|         161|           1|  4|       4|  12| 41.06666666666667|
|         1.78|         211|           4|  1|       1|   0|               9.8|
|          2.5|          68|           4|  1|       1|   5|              11.8|
|         5.47|         162|           4|  1|       1|  11| 35.46666666666667|
|         1.23|         107|           4|  1|       1|  16|              9.45|
|          3.6|          80|           4|  1|       1|  21|10.933333333333334|
|         3.14|         186|           4|  1|       1|  23|15.016666666666667|
|         8.89|         138|           4|  2|       

In [7]:
df2=df1.rdd.map(lambda x:Row(x[0],x[3],x[4],x[5],x[6],set_ID(x[1],x[2]))).toDF(['trip_distance','day','week_day','hour', 'duration','ID'])

In [8]:
df2=df2.groupBy('ID').agg({'duration':'mean'}).sort('ID')


In [9]:
df2=df2.withColumnRenamed('avg(duration)','duration')


In [10]:
df2.show()

+----+------------------+
|  ID|          duration|
+----+------------------+
|1001|  2.36358024691358|
|1004| 52.06666666666667|
|1012| 88.18333333333334|
|1013| 39.99895833333333|
|1024| 62.15833333333333|
|1025|              96.1|
|1033|              67.1|
|1036|             49.75|
|1039| 41.43333333333333|
|1041| 51.22222222222222|
|1043| 53.38761904761905|
|1045|57.300000000000004|
|1048|41.537363834422656|
|1050|36.535185185185185|
|1065| 33.36666666666667|
|1066|             37.95|
|1068| 42.40700483091788|
|1075|             47.75|
|1079|  43.0186274509804|
|1083|              57.2|
+----+------------------+
only showing top 20 rows



In [10]:
T=df2.selectExpr('ID%1000 as ID1','duration','ID')
T=T.selectExpr('ID1','(ID-ID1)/1000 as ID2','duration')
T=T.select('ID1',T['ID2'].astype(IntegerType()),'duration')

In [17]:
T.show()

+---+---+------------------+
|ID1|ID2|          distance|
+---+---+------------------+
|  1|  1| 1.428148148148148|
|  4|  1|             16.23|
| 12|  1|             16.95|
| 13|  1|15.268750000000002|
| 24|  1|26.192500000000003|
| 25|  1|             18.93|
| 33|  1|             18.88|
| 36|  1|             19.85|
| 39|  1|             28.23|
| 41|  1|24.573333333333334|
| 43|  1|19.497714285714284|
| 45|  1| 18.73428571428571|
| 48|  1| 17.83666666666667|
| 50|  1| 17.31222222222222|
| 65|  1|            16.855|
| 66|  1|              15.3|
| 68|  1| 16.65536231884058|
| 75|  1|             21.82|
| 79|  1| 15.44705882352941|
| 83|  1|             20.29|
+---+---+------------------+
only showing top 20 rows



In [11]:
dfz=spark.read.csv(r'taxi_zones.csv',header=True,inferSchema=True)

In [12]:
dfz0=dfz.join(T, T.ID1 == dfz.LocationID, "inner")

In [13]:
dfz1=dfz0.select('ID1','ID2','Borough','duration')
dfz1=dfz1.withColumnRenamed('Borough','Borough1')

In [14]:
dfz1=dfz1.join(dfz,dfz1.ID2==dfz.LocationID,"inner")

In [55]:
dfz1.show()

+---+---+--------------------+------------------+----------+---------+--------------------+------------+
|ID1|ID2|               Zone1|          distance|LocationID|  Borough|                Zone|service_zone|
+---+---+--------------------+------------------+----------+---------+--------------------+------------+
|132| 40|         JFK Airport|21.373728571428575|        40| Brooklyn|     Carroll Gardens|   Boro Zone|
|132| 86|         JFK Airport| 8.541727272727274|        86|   Queens|        Far Rockaway|   Boro Zone|
|126| 42|         Hunts Point| 4.035500000000001|        42|Manhattan|Central Harlem North|   Boro Zone|
|230|193|Times Sq/Theatre ...|3.8778378378378378|       193|   Queens|Queensbridge/Rave...|   Boro Zone|
|231|209|TriBeCa/Civic Center|0.9718301982714795|       209|Manhattan|             Seaport| Yellow Zone|
|265|265|                  NA| 4.348022181146027|       265|  Unknown|                  NA|         N/A|
|114| 66|Greenwich Village...| 3.093392070484581|      

In [15]:
df_zone=dfz1.select('ID1','ID2','Borough','Borough1','duration')
df_zone=df_zone.withColumnRenamed('Borough','Borough2')

In [16]:
df_zone=df_zone.sort('ID2','ID1')

In [55]:
df_zone.show()

+---+---+--------+---------+------------------+
|ID1|ID2|Borough2| Borough1|          duration|
+---+---+--------+---------+------------------+
|  1|  1|     EWR|      EWR|  2.36358024691358|
|  4|  1|     EWR|Manhattan| 52.06666666666667|
| 12|  1|     EWR|Manhattan| 88.18333333333334|
| 13|  1|     EWR|Manhattan| 39.99895833333333|
| 24|  1|     EWR|Manhattan| 62.15833333333333|
| 25|  1|     EWR| Brooklyn|              96.1|
| 33|  1|     EWR| Brooklyn|              67.1|
| 36|  1|     EWR| Brooklyn|             49.75|
| 39|  1|     EWR| Brooklyn| 41.43333333333333|
| 41|  1|     EWR|Manhattan| 51.22222222222222|
| 43|  1|     EWR|Manhattan| 53.38761904761905|
| 45|  1|     EWR|Manhattan|57.300000000000004|
| 48|  1|     EWR|Manhattan|41.537363834422656|
| 50|  1|     EWR|Manhattan|36.535185185185185|
| 65|  1|     EWR| Brooklyn| 33.36666666666667|
| 66|  1|     EWR| Brooklyn|             37.95|
| 68|  1|     EWR|Manhattan| 42.40700483091788|
| 75|  1|     EWR|Manhattan|            

In [17]:
df_zone.coalesce(1).write.option("header", "true").csv("Brough.csv")

In [11]:
df3=df1.groupBy('PULocationID','DOLocationID').agg({'trip_distance':'mean'}).sort('PULocationID','DOLocationID')
df3.show()

+------------+------------+------------------+
|PULocationID|DOLocationID|avg(trip_distance)|
+------------+------------+------------------+
|           1|           1| 1.428148148148148|
|           1|         162|             35.07|
|           1|         264|              0.79|
|           1|         265|             13.58|
|           2|          45|             19.27|
|           2|         216|               3.8|
|           2|         218|               3.5|
|           2|         237|              17.2|
|           3|           3|1.0975000000000001|
|           3|          32|1.0333333333333334|
|           3|          42|             9.145|
|           3|          47|               2.8|
|           3|          51|              2.29|
|           3|          78|3.6999999999999997|
|           3|          81|             2.635|
|           3|         127|               4.5|
|           3|         142|             13.63|
|           3|         147|               4.4|
|           3

In [12]:
df3.where(df3['DOLocationID']==1).show()

+------------+------------+------------------+
|PULocationID|DOLocationID|avg(trip_distance)|
+------------+------------+------------------+
|           1|           1| 1.428148148148148|
|           4|           1|             16.23|
|          12|           1|             16.95|
|          13|           1|15.268750000000002|
|          24|           1|26.192500000000003|
|          25|           1|             18.93|
|          33|           1|             18.88|
|          36|           1|             19.85|
|          39|           1|             28.23|
|          41|           1|24.573333333333334|
|          43|           1|19.497714285714284|
|          45|           1| 18.73428571428571|
|          48|           1| 17.83666666666667|
|          50|           1| 17.31222222222222|
|          65|           1|            16.855|
|          66|           1|              15.3|
|          68|           1| 16.65536231884058|
|          75|           1|             21.82|
|          79

In [3]:
dfz=spark.read.csv(r'taxi_zone.csv',header=True,inferSchema=True)

NameError: name 'spark' is not defined

In [18]:
hours = df2.pivot_table(
    index='day',
    columns='hour',
    values='ID',
    aggfunc='count'
)

AttributeError: 'DataFrame' object has no attribute 'pivot_table'

In [29]:
df2.coalesce(1).write.option("header", "true").csv("data2.csv")

In [30]:
df0=df2.select("*").toPandas()

Py4JJavaError: An error occurred while calling o200.collectToPython.
: java.lang.OutOfMemoryError: GC overhead limit exceeded
	at org.apache.spark.sql.execution.SparkPlan$$anon$1._next(SparkPlan.scala:391)
	at org.apache.spark.sql.execution.SparkPlan$$anon$1.getNext(SparkPlan.scala:402)
	at org.apache.spark.sql.execution.SparkPlan$$anon$1.getNext(SparkPlan.scala:388)
	at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at org.apache.spark.util.NextIterator.foreach(NextIterator.scala:21)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeCollect$1(SparkPlan.scala:425)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeCollect$1$adapted(SparkPlan.scala:424)
	at org.apache.spark.sql.execution.SparkPlan$$Lambda$4046/1514218375.apply(Unknown Source)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:424)
	at org.apache.spark.sql.Dataset.$anonfun$collectToPython$1(Dataset.scala:3688)
	at org.apache.spark.sql.Dataset$$Lambda$3999/1007198195.apply(Unknown Source)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:3858)
	at org.apache.spark.sql.Dataset$$Lambda$1925/1962184514.apply(Unknown Source)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:510)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3856)
	at org.apache.spark.sql.Dataset$$Lambda$1591/70266614.apply(Unknown Source)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
	at org.apache.spark.sql.execution.SQLExecution$$$Lambda$1602/795154898.apply(Unknown Source)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
	at org.apache.spark.sql.execution.SQLExecution$$$Lambda$1592/1901635149.apply(Unknown Source)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3856)
	at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3685)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
