In [1]:
import os
os.environ['PYSPARK_PYTHON'] = '/nfshome/jc9033/.conda/envs/py3/bin/python3.7'

from pyspark import SparkContext
sc = SparkContext('local', 'pyspark')

from pyspark.sql.session import SparkSession
spark = SparkSession(sc)

In [2]:
sc

## fhv

In [3]:
#  Generate a year-month list with a start year-month and an end year-month
def generate_year_month_list(start, end):
    monthlist = []
    for year in range(int(start[:4]),int(end[:4])+1):
        for month in range(1,13):
            if month >= 10:
                temp = str(year) + "-" + str(month) 
            else:
                temp = str(year) + "-" + '0' + str(month) 
            monthlist.append(temp)
    if int(end[-2:]) == 12:
        return monthlist[int(start[-2:])-1:]
    else:
        return monthlist[int(start[-2:])-1: int(end[-2:])-12]

In [4]:
year_month_list = generate_year_month_list("2015-01","2018-06")

# check columns for each month
for i in year_month_list:    
    path = "data_fhv/fhv_" + i + ".csv"
    temp = sc.textFile(path).cache()
    print(i, list(enumerate(temp.first().split(','))))

In [5]:
# 2015-01 : 2016-12, Pickup_date & locationID
# 2017-01 : 2017-06, Pickup_DateTime & DropOff_datetime & PUlocationID & DOlocationID
# 2017-07 : 2018-06, Pickup_DateTime & DropOff_datetime & PUlocationID & DOlocationID & SR_Flag

### Processing functions

In [None]:
def outlier_delete(x):
    try:
        if int(x) in range(1,264):
            return x
        else:
            return "0"
    except:
        return "0"

In [6]:
def mapper1_1(partitionID, rows):
    if partitionID==0:
        next(rows)
    import csv
    reader = csv.reader(rows)
    for fields in reader:
        yield((fields[1][:10], fields[2]),1)

def mapper1_2(partitionID, rows):
    if partitionID==0:
        next(rows)
    import csv
    reader = csv.reader(rows)
    for fields in reader:
        yield((fields[1][:10], fields[3], fields[4]),1)
        
def mapper2_1(partition):
    for element in partition:
        yield(element[0][0], outlier_delete(element[0][1]), "0", element[1])
        
def mapper2_2(partition):
    for element in partition:
        yield(element[0][0], outlier_delete(element[0][1]), outlier_delete(element[0][2]), element[1])

# Cleaning & Integrating

In [7]:
%%time
for i in year_month_list:
    print(i)
    path = "data_fhv/fhv_" + i + ".csv"
    if i == "2015-01":
        rdd = sc.textFile(path)\
                 .mapPartitionsWithIndex(mapper1_1)\
                 .groupByKey().mapValues(sum)\
                 .mapPartitions(mapper2_1)
        rdd.take(5)
    else:
        if int(i[:4])<2017:
            temp = sc.textFile(path)\
                     .mapPartitionsWithIndex(mapper1_1)\
                     .groupByKey().mapValues(sum)\
                     .mapPartitions(mapper2_1)
        else:
            temp = sc.textFile(path)\
                     .mapPartitionsWithIndex(mapper1_2)\
                     .groupByKey().mapValues(sum)\
                     .mapPartitions(mapper2_2)
        rdd = rdd.union(temp)
        rdd.take(5)

2015-01
2015-02
2015-03
2015-04
2015-05
2015-06
2015-07
2015-08
2015-09
2015-10
2015-11
2015-12
2016-01
2016-02
2016-03
2016-04
2016-05
2016-06
2016-07
2016-08
2016-09
2016-10
2016-11
2016-12
2017-01
2017-02
2017-03
2017-04
2017-05
2017-06
2017-07
2017-08
2017-09
2017-10
2017-11
2017-12
2018-01
2018-02
2018-03
2018-05
2018-06
CPU times: user 780 ms, sys: 168 ms, total: 948 ms
Wall time: 34min 16s


### Output CSV

In [8]:
%%time
from pyspark import SparkContext
df = spark.createDataFrame(rdd, ['date', 'PUlocationID', 'DOlocationID', 'count'])

CPU times: user 272 ms, sys: 72 ms, total: 344 ms
Wall time: 3.02 s


In [9]:
df.show()

+----------+------------+------------+-----+
|      date|PUlocationID|DOlocationID|count|
+----------+------------+------------+-----+
|2015-01-01|           0|           0|10574|
|2015-01-03|           0|           0|13391|
|2015-01-04|           0|           0|13173|
|2015-01-13|           0|           0|21628|
|2015-01-15|           0|           0|24166|
|2015-01-16|           0|           0|21363|
|2015-01-19|           0|           0|14428|
|2015-01-22|           0|           0|21827|
|2015-01-28|           0|           0|19264|
|2015-01-30|           0|           0|20839|
|2015-01-01|         141|           0|  585|
|2015-01-01|         191|           0|   27|
|2015-01-01|         216|           0|   55|
|2015-01-02|         181|           0|  512|
|2015-01-02|         143|           0|  290|
|2015-01-02|          13|           0|  421|
|2015-01-02|           0|           0| 2506|
|2015-01-02|          70|           0|   39|
|2015-01-02|          65|           0|  168|
|2015-01-0

In [10]:
%%time
df.coalesce(1).write.format('com.databricks.spark.csv').options(header='true').save('output_fhv')

CPU times: user 12 ms, sys: 4 ms, total: 16 ms
Wall time: 3min 12s


###  Check file

In [11]:
filename = ! ls output_fhv | grep .csv

In [12]:
import pandas as pd
pddf = pd.read_csv('output_fhv/'+filename[0]).fillna(0)
print(pddf.shape)

(12899039, 4)


In [13]:
pddf.head()

Unnamed: 0,date,PUlocationID,DOlocationID,count
0,2015-01-01,0,0,10574
1,2015-01-03,0,0,13391
2,2015-01-04,0,0,13173
3,2015-01-13,0,0,21628
4,2015-01-15,0,0,24166


In [14]:
pddf.tail()

Unnamed: 0,date,PUlocationID,DOlocationID,count
12899034,2018-06-08,263,71,1
12899035,2018-06-06,174,14,1
12899036,2018-06-06,232,174,1
12899037,2018-06-29,20,25,1
12899038,2018-06-20,158,10,1
