In [1]:
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import UserDefinedFunction,isnan, when, count, col, isnull,month, hour,year,minute,second
from pyspark.sql import functions as F
from pyspark.sql.types import TimestampType, DateType
from pyspark.sql import DataFrame
from functools import reduce
import glob
import dateparser

In [2]:
spark = (
    SparkSession.builder
    .config('spark.executor.memory', '40g')
    .config('spark.executor.cores', '40')
    .config('spark.driver.memory','20g').master("spark://Dis-iMac-Pro:7077").appName("apc")
    .getOrCreate()
)

In [3]:
sqlContext = SQLContext(spark.sparkContext)

In [4]:
df=sqlContext.read.csv('/Users/abhishek/Documents/carta-raw-data/*.TXT',inferSchema=True, header=True).dropDuplicates(['TRIP_KEY','SURVEY_DATE','ROUTE_NUMBER','DIRECTION_NAME','STOP_ID','SORT_ORDER'])

In [6]:
def convertdate(x):
    try:
        out=dateparser.parse(str(x).split(" ")[0])
        return out
    except:
        return None
def convertstamp(x):
    try:
        out=dateparser.parse(str(x).split(" ")[1])
        return out
    except:
        return None
converttimeudf = UserDefinedFunction(lambda x: convertstamp(x), TimestampType())
convertdateudf = UserDefinedFunction(lambda x: convertdate(x), TimestampType())

In [7]:
df3=df.withColumn("TIME_SCHEDULED", converttimeudf(df['TIME_SCHEDULED']))\
    .withColumn("TRIP_START_TIME", converttimeudf(df['TRIP_START_TIME']))\
    .withColumn("TIME_ACTUAL_ARRIVE", converttimeudf(df['TIME_ACTUAL_ARRIVE']))\
    .withColumn("TIME_ACTUAL_DEPART", converttimeudf(df['TIME_ACTUAL_DEPART']))\
    .withColumn("SURVEY_DATE",convertdateudf(df["SURVEY_DATE"]).cast(DateType()))
df=df3.withColumn("MONTH",month(df3["SURVEY_DATE"]))\
    .withColumn("YEAR",year(df3["SURVEY_DATE"]))\
    .withColumn("TIME_SCHEDULED_HOUR",hour(df3["TIME_SCHEDULED"]))\
    .withColumn("TIME_SCHEDULED_MIN",minute(df3["TIME_SCHEDULED"]))\
    .withColumn("TIME_SCHEDULED_SEC",second(df3["TIME_SCHEDULED"]))\
    .withColumn("TRIP_START_TIME_HOUR",hour(df3["TRIP_START_TIME"]))\
    .withColumn("TRIP_START_TIME_MIN",minute(df3["TRIP_START_TIME"]))\
    .withColumn("TRIP_START_TIME_SEC",second(df3["TRIP_START_TIME"]))\
    .withColumn("TIME_ACTUAL_ARRIVE_HOUR",hour(df3["TIME_ACTUAL_ARRIVE"]))\
    .withColumn("TIME_ACTUAL_ARRIVE_MIN",minute(df3["TIME_ACTUAL_ARRIVE"]))\
    .withColumn("TIME_ACTUAL_ARRIVE_SEC",second(df3["TIME_ACTUAL_ARRIVE"]))\
    .withColumn("TIME_ACTUAL_DEPART_HOUR",hour(df3["TIME_ACTUAL_DEPART"]))\
    .withColumn("TIME_ACTUAL_DEPART_MIN",minute(df3["TIME_ACTUAL_DEPART"]))\
    .withColumn("TIME_ACTUAL_DEPART_SEC",second(df3["TIME_ACTUAL_DEPART"]))

In [8]:
df=df.withColumn('DIRECTION_NAME',when(df.DIRECTION_NAME=="OUTYBOUND" ,"OUTBOUND")\
    .when(df.DIRECTION_NAME=="0" ,"OUTBOUND")\
        .when(df.DIRECTION_NAME=="1" ,"INBOUND")\
            .otherwise(df.DIRECTION_NAME))

In [20]:
df=df.withColumnRenamed('trip_key','trip_id')

In [9]:
df=df.select([F.col(x).alias(x.lower()) for x in df.columns])

In [11]:
spark.conf.set("spark.sql.repl.eagerEval.enabled",True)

In [12]:
#df.limit(2)

serial_number,schedule_id,schedule_name,signup_name,survey_date,survey_status,survey_type,survey_source,pattern_id,route_number,route_name,direction_name,branch,service_code,service_type,service_class,service_mode,trip_start_time,time_period,service_period,trip_number,trip_key,block_number,block_key,block_id,block_name,run_number,run_key,vehicle_number,vehicle_description,vehicle_seats,revenue_start,revenue_end,revenue_net,odom_start,odom_end,odom_net,condition_number,checker_name,garage_name,division_name,operator_id,farebox,match_count,comments,sort_order,stop_id,main_cross_street,travel_direction,timepoint,segment_miles,time_scheduled,time_actual_arrive,time_actual_depart,dwell_time,running_time_actual,passengers_on,passengers_off,passengers_in,passengers_spot,wheelchairs,bicycles,match_distance,timepoint_miles,non_student_fare,child,nr_board,nr_alight,kneels,comment_number,checker_time,first_last_stop,month,year,time_scheduled_hour,time_scheduled_min,time_scheduled_sec,trip_start_time_hour,trip_start_time_min,trip_start_time_sec,time_actual_arrive_hour,time_actual_arrive_min,time_actual_arrive_sec,time_actual_depart_hour,time_actual_depart_min,time_actual_depart_sec
4689608,106,Aug18 (Weekday),August 19 2018,2019-04-02,2,1,3,914,1.4,Route #1 am HC:1,OUTBOUND,[1]ALTON PARK PM OB,Route #1,,,Bus,2021-05-12 04:55:00,AM Peak,Weekday,1,132994,102,1011,0,,3,3,129,Gillig HF 2002,30,,,,,,,0,,***Unknown Garage***,,160539,,24,Trip starts at 4:...,300,27,MARKET/W 25TH,S,0,0.06,,2021-05-12 05:01:21,2021-05-12 05:01:21,0.0,,0,0,0,,0,0,1,,,,,,0,,,2,4,2019,,,,4,55,0,5,1,21,5,1,21
4695844,106,Aug18 (Weekday),August 19 2018,2019-04-09,2,1,3,914,1.4,Route #1 am HC:1,OUTBOUND,[1]ALTON PARK PM OB,Route #1,,,Bus,2021-05-12 04:55:00,AM Peak,Weekday,1,132994,102,1011,0,,3,3,128,Gillig HF 2002,30,,,,,,,0,,***Unknown Garage***,,160539,,24,Trip starts at 4:...,350,100048,HalsCent,X,-1,,2021-05-12 05:25:00,2021-05-12 05:24:32,2021-05-12 05:24:33,,,0,0,5,,0,0,241,0.0,,,,,0,,,3,4,2019,5.0,25.0,0.0,4,55,0,5,24,32,5,24,33


In [16]:
columns_to_drop = ['condition_number','odom_end','division_name','garage_name','checker_time','signup_name','comments','comment_number','non_student_fare','checker_name','timepoint','signup_name','schedule_id','odom_start','odom_end','schedule_name','time_period','kneels','revenue_start','nr_board','nr_alight','revenue_end','running_time_actual','dwell_time']

In [17]:
df = df.drop(*columns_to_drop)

In [None]:
df.write\
        .option("mapreduce.fileoutputcommitter.algorithm.version", "2")\
        .partitionBy("year","month")\
        .mode("append")\
        .format("parquet")\
        .save("/Users/abhishek/spark/carta/apcdata")

In [4]:
df=spark.read.load('/Users/abhishek/spark/carta/apcdata')

In [5]:
spark.conf.set("spark.sql.repl.eagerEval.enabled",True)

In [7]:
df.limit(2)

serial_number,survey_date,survey_status,survey_type,survey_source,pattern_id,route_number,route_name,direction_name,branch,service_code,service_type,service_class,service_mode,trip_start_time,service_period,trip_number,trip_id,block_number,block_key,block_id,block_name,run_number,run_key,vehicle_number,vehicle_description,vehicle_seats,revenue_net,odom_net,operator_id,farebox,match_count,sort_order,stop_id,main_cross_street,travel_direction,segment_miles,time_scheduled,time_actual_arrive,time_actual_depart,passengers_on,passengers_off,passengers_in,passengers_spot,wheelchairs,bicycles,match_distance,timepoint_miles,child,first_last_stop,time_scheduled_hour,time_scheduled_min,time_scheduled_sec,trip_start_time_hour,trip_start_time_min,trip_start_time_sec,time_actual_arrive_hour,time_actual_arrive_min,time_actual_arrive_sec,time_actual_depart_hour,time_actual_depart_min,time_actual_depart_sec,year,month
4706115,2019-04-22,2,1,3,914,1.4,Route #1 am HC:1,OUTBOUND,[1]ALTON PARK PM OB,Route #1,,,Bus,2021-05-12 04:55:00,Weekday,1,132994,102,1011,0,,3,3,134,Gillig HF 2003,30,,,160539,,24,260,23,MARKET/19TH,S,0.06,,2021-05-12 04:59:41,2021-05-12 04:59:41,0,0,0,,0,0,0,,,2,,,,4,55,0,4,59,41,4,59,41,2019,4
4713165,2019-04-30,2,1,3,914,1.4,Route #1 am HC:1,OUTBOUND,[1]ALTON PARK PM OB,Route #1,,,Bus,2021-05-12 04:55:00,Weekday,1,132994,102,1011,0,,3,3,118,Gillig HF 2002,30,,,160539,,24,270,24,MARKET/20TH,S,0.06,,2021-05-12 04:58:30,2021-05-12 04:58:30,0,0,1,,0,0,0,,,2,,,,4,55,0,4,58,30,4,58,30,2019,4


In [8]:
df.printSchema()

root
 |-- serial_number: integer (nullable = true)
 |-- survey_date: date (nullable = true)
 |-- survey_status: integer (nullable = true)
 |-- survey_type: integer (nullable = true)
 |-- survey_source: integer (nullable = true)
 |-- pattern_id: integer (nullable = true)
 |-- route_number: double (nullable = true)
 |-- route_name: string (nullable = true)
 |-- direction_name: string (nullable = true)
 |-- branch: string (nullable = true)
 |-- service_code: string (nullable = true)
 |-- service_type: string (nullable = true)
 |-- service_class: string (nullable = true)
 |-- service_mode: string (nullable = true)
 |-- trip_start_time: timestamp (nullable = true)
 |-- service_period: string (nullable = true)
 |-- trip_number: integer (nullable = true)
 |-- trip_id: integer (nullable = true)
 |-- block_number: integer (nullable = true)
 |-- block_key: string (nullable = true)
 |-- block_id: integer (nullable = true)
 |-- block_name: integer (nullable = true)
 |-- run_number: integer (nullab