In [1]:
!which python

//anaconda/envs/spark/bin/python


In [2]:
!python --version

Python 2.7.13 :: Continuum Analytics, Inc.


In [3]:
# make sure SparkContext is loaded 
sc

<pyspark.context.SparkContext at 0x108883c90>

In [37]:
# load library 
import pandas as pd, numpy as np
from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext


In [5]:
# load applications  
conf = SparkConf().setAppName("building a warehouse")
#sc = SparkContext(conf=conf)
sqlCtx = SQLContext(sc)


In [7]:
cd NYC_Taxi_Trip_Duration/spark_/

/Users/yennanliu/NYC_Taxi_Trip_Duration/spark_


### 1) Read csv via spark SQL

In [8]:
# read csv via spark SQL  
df_train = sqlContext.read.format('com.databricks.spark.csv')\
						 .options(header='true', inferschema='true')\
						 .load('/Users/yennanliu/NYC_Taxi_Trip_Duration/data/train.csv')

In [30]:
# check data type
df_train.dtypes

[('id', 'string'),
 ('vendor_id', 'int'),
 ('pickup_datetime', 'timestamp'),
 ('dropoff_datetime', 'timestamp'),
 ('passenger_count', 'int'),
 ('pickup_longitude', 'double'),
 ('pickup_latitude', 'double'),
 ('dropoff_longitude', 'double'),
 ('dropoff_latitude', 'double'),
 ('store_and_fwd_flag', 'string'),
 ('trip_duration', 'int')]

In [32]:
# check missing values 
df_train.where( df_train['vendor_id'].isNull() ).count()

0

In [9]:
df_train.show()

+---------+---------+--------------------+--------------------+---------------+------------------+------------------+------------------+------------------+------------------+-------------+
|       id|vendor_id|     pickup_datetime|    dropoff_datetime|passenger_count|  pickup_longitude|   pickup_latitude| dropoff_longitude|  dropoff_latitude|store_and_fwd_flag|trip_duration|
+---------+---------+--------------------+--------------------+---------------+------------------+------------------+------------------+------------------+------------------+-------------+
|id2875421|        2|2016-03-14 17:24:...|2016-03-14 17:32:...|              1| -73.9821548461914| 40.76793670654297|-73.96463012695312|40.765602111816406|                 N|          455|
|id2377394|        1|2016-06-12 00:43:...|2016-06-12 00:54:...|              1|-73.98041534423828|40.738563537597656|-73.99948120117188| 40.73115158081055|                 N|          663|
|id3858529|        2|2016-01-19 11:35:...|2016-01-19 12

In [35]:
#df_grp = df_train.groupBy('vendor_id')
#df_grp.sum('pickup_longitude','dropoff_longitude').show()



In [10]:
df_train.take(4)

[Row(id=u'id2875421', vendor_id=2, pickup_datetime=datetime.datetime(2016, 3, 14, 17, 24, 55), dropoff_datetime=datetime.datetime(2016, 3, 14, 17, 32, 30), passenger_count=1, pickup_longitude=-73.9821548461914, pickup_latitude=40.76793670654297, dropoff_longitude=-73.96463012695312, dropoff_latitude=40.765602111816406, store_and_fwd_flag=u'N', trip_duration=455),
 Row(id=u'id2377394', vendor_id=1, pickup_datetime=datetime.datetime(2016, 6, 12, 0, 43, 35), dropoff_datetime=datetime.datetime(2016, 6, 12, 0, 54, 38), passenger_count=1, pickup_longitude=-73.98041534423828, pickup_latitude=40.738563537597656, dropoff_longitude=-73.99948120117188, dropoff_latitude=40.73115158081055, store_and_fwd_flag=u'N', trip_duration=663),
 Row(id=u'id3858529', vendor_id=2, pickup_datetime=datetime.datetime(2016, 1, 19, 11, 35, 24), dropoff_datetime=datetime.datetime(2016, 1, 19, 12, 10, 48), passenger_count=1, pickup_longitude=-73.9790267944336, pickup_latitude=40.763938903808594, dropoff_longitude=-74.

In [11]:
df_train.registerTempTable("df_train_table")
sqlContext.sql("""
                SELECT id, count(*) 
                FROM df_train_table
                group by 1 
                order by 2 desc 
                limit 10""").show()


+---------+--------+
|       id|count(1)|
+---------+--------+
|id3013319|       1|
|id1622754|       1|
|id2187774|       1|
|id3921267|       1|
|id2795297|       1|
|id0130048|       1|
|id2088360|       1|
|id0454719|       1|
|id2366364|       1|
|id0187208|       1|
+---------+--------+



### 1-1) window

In [42]:
from pyspark.sql import Window
from pyspark.sql.functions import mean

In [41]:
window = Window.partitionBy('id', 'vendor_id')\
               .orderBy('pickup_datetime')\
               .rowsBetween(-3, 3)

window

<pyspark.sql.window.WindowSpec at 0x10a190250>

In [43]:
moving_avg = mean(df_train['passenger_count']).over(window)
moving_avg

Column<avg(passenger_count) OVER (PARTITION BY id, vendor_id ORDER BY pickup_datetime ASC ROWS BETWEEN 3 PRECEDING AND 3 FOLLOWING)>

In [45]:
df_window_ = df_train.withColumn('moving_avg', moving_avg)
df_window_.take(3)

[Row(id=u'id0000015', vendor_id=1, pickup_datetime=datetime.datetime(2016, 5, 17, 9, 6, 59), dropoff_datetime=datetime.datetime(2016, 5, 17, 9, 39, 18), passenger_count=1, pickup_longitude=-73.98369598388672, pickup_latitude=40.780948638916016, dropoff_longitude=-73.95437622070312, dropoff_latitude=40.76417541503906, store_and_fwd_flag=u'N', trip_duration=1939, moving_avg=1.0),
 Row(id=u'id0000023', vendor_id=2, pickup_datetime=datetime.datetime(2016, 5, 28, 4, 34, 47), dropoff_datetime=datetime.datetime(2016, 5, 28, 5, 1, 47), passenger_count=1, pickup_longitude=-73.94206237792969, pickup_latitude=40.817779541015625, dropoff_longitude=-73.7889175415039, dropoff_latitude=40.64738845825195, store_and_fwd_flag=u'N', trip_duration=1620, moving_avg=1.0),
 Row(id=u'id0000250', vendor_id=1, pickup_datetime=datetime.datetime(2016, 3, 30, 8, 38, 35), dropoff_datetime=datetime.datetime(2016, 3, 30, 8, 46, 42), passenger_count=1, pickup_longitude=-73.99744415283203, pickup_latitude=40.7363395690

### 1-2) pivot

In [53]:
df_pivot1 = df_train.groupby('pickup_datetime')\
                    .pivot('id', values=['passenger_count'])\
                    .sum('dropoff_longitude')
        
df_pivot1                                                                                
                                                                                

DataFrame[pickup_datetime: timestamp, passenger_count: double]

In [54]:
df_pivot1.take(2)

[Row(pickup_datetime=datetime.datetime(2016, 6, 30, 18, 23, 16), passenger_count=None),
 Row(pickup_datetime=datetime.datetime(2016, 4, 20, 11, 38, 30), passenger_count=None)]

### 2) basic functions 

In [14]:
pwd

u'/Users/yennanliu/NYC_Taxi_Trip_Duration/spark_'

In [15]:
df__ = sc.textFile("/Users/yennanliu/NYC_Taxi_Trip_Duration/data/train.csv")
#df__.filter(lambda x: '2124' in x.).collect()

In [16]:
df__.take(5)

[u'id,vendor_id,pickup_datetime,dropoff_datetime,passenger_count,pickup_longitude,pickup_latitude,dropoff_longitude,dropoff_latitude,store_and_fwd_flag,trip_duration',
 u'id2875421,2,2016-03-14 17:24:55,2016-03-14 17:32:30,1,-73.982154846191406,40.767936706542969,-73.964630126953125,40.765602111816406,N,455',
 u'id2377394,1,2016-06-12 00:43:35,2016-06-12 00:54:38,1,-73.980415344238281,40.738563537597656,-73.999481201171875,40.731151580810547,N,663',
 u'id3858529,2,2016-01-19 11:35:24,2016-01-19 12:10:48,1,-73.979026794433594,40.763938903808594,-74.005332946777344,40.710086822509766,N,2124',
 u'id3504673,2,2016-04-06 19:32:31,2016-04-06 19:39:40,1,-74.010040283203125,40.719970703125,-74.01226806640625,40.706718444824219,N,429']

In [87]:
#protocols = df__.map(lambda x: x[4]).distinct()
#protocols.take(10)
df_train.dtypes

[('id', 'string'),
 ('vendor_id', 'int'),
 ('pickup_datetime', 'timestamp'),
 ('dropoff_datetime', 'timestamp'),
 ('passenger_count', 'int'),
 ('pickup_longitude', 'double'),
 ('pickup_latitude', 'double'),
 ('dropoff_longitude', 'double'),
 ('dropoff_latitude', 'double'),
 ('store_and_fwd_flag', 'string'),
 ('trip_duration', 'int')]

In [88]:
type(df_train)

pyspark.sql.dataframe.DataFrame

In [146]:
xx = df_train.select('id','vendor_id','pickup_datetime').rdd
#xx = df_train.select('vendor_id').rdd
type(xx)

pyspark.rdd.RDD

In [126]:
xx.map(lambda x: x[0] == 'id2875421').take(3)

[True, False, False]

In [136]:
# RDD group 
# https://spark.apache.org/docs/1.1.1/api/python/pyspark.rdd.RDD-class.html#groupBy

result = xx.groupBy(lambda x: x[1] % 2).count()

In [157]:
xx.map(lambda x: x[0]).take(3)
#sorted([(x, sorted(y)) for (x, y) in result])

[u'id2875421', u'id2377394', u'id3858529']

In [159]:
xx.take(5)

[Row(id=u'id2875421', vendor_id=2, pickup_datetime=datetime.datetime(2016, 3, 14, 17, 24, 55)),
 Row(id=u'id2377394', vendor_id=1, pickup_datetime=datetime.datetime(2016, 6, 12, 0, 43, 35)),
 Row(id=u'id3858529', vendor_id=2, pickup_datetime=datetime.datetime(2016, 1, 19, 11, 35, 24)),
 Row(id=u'id3504673', vendor_id=2, pickup_datetime=datetime.datetime(2016, 4, 6, 19, 32, 31)),
 Row(id=u'id2181028', vendor_id=2, pickup_datetime=datetime.datetime(2016, 3, 26, 13, 30, 55))]

In [130]:
xx.map(lambda x: x[1]).take(3)

[2, 1, 2]

In [158]:
#xx.take(10)

In [66]:
df__.map(lambda x: x[0:22]).take(10)

[u'id,vendor_id,pickup_da',
 u'id2875421,2,2016-03-14',
 u'id2377394,1,2016-06-12',
 u'id3858529,2,2016-01-19',
 u'id3504673,2,2016-04-06',
 u'id2181028,2,2016-03-26',
 u'id0801584,2,2016-01-30',
 u'id1813257,1,2016-06-17',
 u'id1324603,2,2016-05-21',
 u'id1301050,1,2016-05-27']

In [77]:
# df__.map(lambda x: x[12:22]).take(10)

df__value = df__.filter(lambda line: line != header)
result = df__value.groupBy(lambda x : x[0:][12:22]).take(10)

In [55]:
header = df__.first()
df__value = df__.filter(lambda line: line != header)
df__value.map(lambda x : x[0:][12:22]).take(10)
print (df__value.map(lambda x : x[0:][12:22]).take(10))


[u'2016-03-14', u'2016-06-12', u'2016-01-19', u'2016-04-06', u'2016-03-26', u'2016-01-30', u'2016-06-17', u'2016-05-21', u'2016-05-27', u'2016-03-10']
