In [1]:
from pyspark import SparkContext
sc = SparkContext.getOrCreate()

In [2]:
# Akanksha's Preprocessing

import datetime
# Reading trip.csv from s3 cluster
trip_rdd = sc.textFile('s3://msds694.proj/data/trip.csv', 24)
trips_rdd = trip_rdd.map(lambda x: x.split(","))
trips = trips_rdd.collect()
# Removing the header row from the data
header = trips_rdd.first()
trips_rdd_rows = trips_rdd.filter(lambda line: line != header)
# Creating an RDD with only Subscriber data
trips_rdd_rows_subscribers = trips_rdd_rows.filter(lambda x: x[9] != 'Customer')
# Creating an RDD with only Customer data
trips_rdd_rows_customers = trips_rdd_rows.filter(lambda x: x[9] != 'Subscriber')
# Getting the day of the week from the date column
# Creating a key-value pair with day of the week as the key, and 1 as the value (to keep track of bike trips)
trips_subscriber_dow_subtype = trips_rdd_rows_subscribers.map(lambda x: ((datetime.datetime.strptime(x[2], 
                                       '%m/%d/%Y %H:%M').strftime("%A")), 1))
trips_customer_dow_subtype = trips_rdd_rows_customers.map(lambda x: ((datetime.datetime.strptime(x[2], 
                                       '%m/%d/%Y %H:%M').strftime("%A")), 1))
# GroupByKey and sum of values will fetch the number of trips on a given day of week
# For Subscribers:
trips_sub_dow_subtype_grouped = trips_subscriber_dow_subtype.groupByKey().mapValues(lambda x: sum(x))
# For Customers:
trips_cus_dow_subtype_grouped = trips_customer_dow_subtype.groupByKey().mapValues(lambda x: sum(x))
# Save for EDA
trips_sub_dow_subtype_grouped.saveAsTextFile('s3://msds694.proj/data/sub_total_trip_by_dow')
trips_cus_dow_subtype_grouped.saveAsTextFile('s3://msds694.proj/data/cus_total_trip_by_dow')
# Getting the day of the week from the date column
# Creating a key-value pair with day of the week as the key, and duration of the trip as the value
trips_sub_dow_duration = trips_rdd_rows_subscribers.map(lambda x: ((datetime.datetime.strptime(x[2], 
                                       '%m/%d/%Y %H:%M').strftime("%A")), int(x[1])))
trips_cus_dow_duration = trips_rdd_rows_customers.map(lambda x: ((datetime.datetime.strptime(x[2], 
                                       '%m/%d/%Y %H:%M').strftime("%A")), int(x[1])))
# GroupByKey and sum of values will fetch the total duration of trips (in mins) on a given day of week
# For Subscribers:
trips_sub_dow_duration_sum = trips_sub_dow_duration.groupByKey().mapValues(lambda x: round(sum(x)/60, 2))
# For Customers:
trips_cus_dow_duration_sum = trips_cus_dow_duration.groupByKey().mapValues(lambda x: round(sum(x)/60, 2))
# Getting the day of the week from the date column
# Creating a key-value pair with day of the week as the key, and id of the trip as the value
trips_sub_dow_id = trips_rdd_rows_subscribers.map(lambda x: ((datetime.datetime.strptime(x[2], 
                                       '%m/%d/%Y %H:%M').strftime("%A")), x[0]))
trips_cus_dow_id = trips_rdd_rows_customers.map(lambda x: ((datetime.datetime.strptime(x[2], 
                                       '%m/%d/%Y %H:%M').strftime("%A")), x[0]))
# GroupByKey and count of ids will fetch the total trips on a given day of week
# For Subscribers:
trips_sub_dow_id_count = trips_sub_dow_id.groupByKey().mapValues(lambda x: len(set(x)))
# For Customers:
trips_cus_dow_id_count = trips_cus_dow_id.groupByKey().mapValues(lambda x: len(set(x)))
# Join the RDD with (Day of week, total trip duration) as key-value pair, to the
# RDD with (Day of week, total trips) as key-value pair on key -> Day of Week
trips_sub_dow_id_ct_dur = trips_sub_dow_duration_sum.join(trips_sub_dow_id_count)
trips_cus_dow_id_ct_dur = trips_cus_dow_duration_sum.join(trips_cus_dow_id_count)
# Compute the average duration per trip per day of week for both Subscribers and the Customers
trips_sub_dow_avg_dur = trips_sub_dow_id_ct_dur.mapValues(lambda x: x[0]/x[1])
trips_cus_dow_avg_dur = trips_cus_dow_id_ct_dur.mapValues(lambda x: x[0]/x[1])
# Save for EDA
trips_sub_dow_avg_dur.saveAsTextFile('s3://msds694.proj/data/sub_avg_trip_duration_by_dow')
trips_cus_dow_avg_dur.saveAsTextFile('s3://msds694.proj/data/cus_avg_trip_duration_by_dow')


# Esther's preprocessing

rdd1= sc.textFile('s3://msds694.proj/data/trip.csv',8)
rdd1.cache()
rdd1.collect()
lines1 = rdd1.map(lambda x: ((x.split(',')[2][:x.split(',')[2].find(' ')],x.split(',')[9]),
                            (x.split(',')[0],x.split(',')[1])))
lines1.cache()
lines1.collect()
pair1 = lines1.groupByKey().mapValues(lambda x:len(list(x)))
pair1.cache()
pair1.collect()
pair2 = pair1.map(lambda x:(x[0][0],(x[0][1],x[1])))
pair2.collect()
rdd = sc.textFile('s3://msds694.proj/data/weather.csv',24)
rdd.collect()
lines2 = rdd.map(lambda x: (x.split(',')[0],
                            (x.split(',')[2],x.split(',')[19],x.split(',')[23])))
lines2.collect()
lines3 = lines2.filter(lambda x:x[1][2]=='94107')
joined = lines3.leftOuterJoin(pair2)
joined.collect()
temp = joined.map(lambda x:((x[1][0][0], x[1][1][0] ),x[1][1][1]))
rain = joined.map(lambda x:((x[1][0][1], x[1][1][0] ),x[1][1][1]))
temp_mean = temp.groupByKey().mapValues(lambda x:(len(list(x)),sum(list(x))/len(list(x))))
rain_mean = rain.groupByKey().mapValues(lambda x:(len(list(x)),sum(list(x))/len(list(x))))
temp_mean.saveAsTextFile("s3://msds694.proj/data/Esther/temp")
rain_mean.saveAsTextFile("s3://msds694.proj/data/Esther/rain")


# Marine's preprocessing

station = sc.textFile('s3://msds694.proj/data/station.csv',24)
station_c=station.map(lambda x:x.split(',')[0:4])
trip_c=trip.map(lambda x:x.split(',')[1:8])
key_pair=trip_c.map(lambda x: ((x[3],x[6]),1))
start_end_count=key_pair.groupByKey().mapValues(lambda x : sum(x))
start_end_count=start_end_count.map(lambda x: (x[0][0],x[0][1],x[1]))
station_pair=station_c.map(lambda x: (x[0],(x[1],x[2],x[3])))
start_key=start_end_count.map(lambda x:(x[0],(x[1],x[2])))
df1=start_key.join(station_pair).map(lambda x: (x[0],x[1][0][0],x[1][0][1],x[1][1][0],x[1][1][1],x[1][1][2]))
df2=df1.map(lambda x: (x[1],(x[0],x[2],x[3],x[4],x[5])))
df_full=df2.join(station_pair).map(lambda x:(x[0],x[1][0][0],x[1][0][1],x[1][0][2],
                                     x[1][0][3],x[1][0][4],x[1][1][0],x[1][1][1],x[1][1][2]))


# Lexie's preprocessing

rdd = sc.textFile('s3://msds694.proj/data/trip.csv', 24)
rdd = rdd.flatMap(lambda x : [x.replace(';','').split(",")])
rdd = rdd.filter(lambda x: x[0] != 'id')
# subscriber
subscriber_1 = rdd.filter(lambda x: x[9] == 'Subscriber')
subscriber_1 = subscriber_1.map(lambda x: float(x[1])/60)
subscriber_1 = subscriber_1.filter(lambda x: x<60)
# customer
customer_1 = rdd.filter(lambda x: x[9] == 'Customer')
customer_1 = customer_1.map(lambda x: float(x[1])/60)
customer_1 = customer_1.filter(lambda x: x<60)
# save to S3
subscriber_1.saveAsTextFile("s3://msds694.proj/data/Lexie/subscriber")
customer_1.saveAsTextFile("s3://msds694.proj/data/Lexie/customer")

# Kevin's preprocessing

from pyspark import SparkContext
from datetime import datetime
from dateutil.parser import parse
from pyspark.sql.functions import udf, to_date, to_utc_timestamp
import numpy as np

station_path = 's3://msds694.proj/data/station.csv'
status_path = 's3://msds694.proj/data/status.csv'
trip_path = 's3://msds694.proj/data/trip.csv'
weather_path = 's3://msds694.proj/data/weather.csv'
status_rdd = sc.textFile(status_path)
station_rdd = sc.textFile(station_path)
status_header = status_rdd.take(1)
station_header = station_rdd.take(1)
status_header,station_header
STATION_ID = 0
BIKES_AVAILABLE = 1
DOCKS_AVAILABLE = 2
TIME = 3
def change_col(x,col_num,func,**kwargs):
    x[col_num] = func(x[col_num],**kwargs)
    return x
def makekey(x,*args):
    key = tuple([item for idx,item in enumerate(x) if idx in args])
    return (key,x)
def flatten(x,iterations=1):
    return sum(x[1])
status_rdd_processed = (status_rdd.filter(lambda x: not x.startswith('station_id'))
                                  .map(lambda x:x.split(','))
                                  .map(lambda x:x[:TIME]+[x[TIME].replace('/','-')])
                                  .map(lambda x: change_col(x,TIME,parse))
                                  .filter(lambda x: x[TIME] >= datetime(2013,12,29) and x[TIME] <= datetime(2015,1,1))
                                  .map(lambda x:x+[datetime(x[TIME].year,x[TIME].month,x[TIME].day,x[TIME].hour)]))
status_rdd_processed.cache().count()
status_rdd_processed.take(1)
status_rdd_proccessed_key = status_rdd_processed.map(lambda x: makekey(x,STATION_ID))
station_rdd_processed = (station_rdd.filter(lambda x: not x.startswith('id'))
                                    .map(lambda x:x.split(','))
                                    .filter(lambda x: x[5] == 'San Francisco')
                                    .map(lambda x:makekey(x,0)))
status_rdd_join_sf_station = status_rdd_proccessed_key.join(station_rdd_processed)\
                                                      .map(lambda x: x[1][0]+ x[1][1] ).cache()
status_rdd_processed_gb_id_hour = status_rdd_join_sf_station.map(lambda x: makekey(x,4,6))
min_timestamp_per_hour = status_rdd_processed_gb_id_hour.map(lambda x: (x[0],x[1][3]))\
                                                        .reduceByKey(lambda x,y:min(x,y)).cache()
df = status_rdd_processed_gb_id_hour.join(min_timestamp_per_hour)\
                               .map(lambda x: x[1][0] + [x[1][1]])\
                               .filter(lambda x:x[3] == x[12])\
                               .map(lambda x:x[0:12]).cache()
def is_weekend(date):
    weekday_name = date.strftime('%A')
    if weekday_name in ['Saturday','Sunday']:
        return True
    else:
        return False
station_name_key = station_rdd_processed.map(lambda x:x[1])\
                                        .map(lambda x:makekey(x,1))
df_agg = (df.map(lambda x: x+[is_weekend(x[4])])          # create weekend flag
               .map(lambda x: x+[int(x[1])/int(x[9])])    # calculate percentate available
               .map(lambda x: x+[x[4].hour])              # compute hour
               .map(lambda x: [x[14],x[12],x[6],x[13]])   # select hour,weekend_flag,station_name,percent_bikes_avail
               .map(lambda x: makekey(x,0,1,2))           # make hour,weekend_flag,and station_name keys
               .map(lambda x:(x[0],x[1][-1]))             # grab only percent_bikes_avail column to be in values
               .aggregateByKey((0,0), lambda a,b: (a[0] + b, a[1] + 1),  
                                       lambda a,b: (a[0] + b[0], a[1] + b[1]))  # calculate a tuple of (cum_sum, count) to compute average
               .mapValues(lambda x:x[0]/x[1])       # compute average pct_avail for each key
               .map(lambda x: list(x[0])+[x[1]])    # flatten key value pairs
               .map(lambda x: makekey(x,2))         # make station name the key
               .join(station_name_key)              # join with station table to get info about station
               .map(lambda x: x[1][0]+x[1][1]))     # flatten after join
def toCSVLine(data):
  return ','.join(str(d) for d in data)
lines = df_agg.map(toCSVLine)
lines.saveAsTextFile('s3://msds694.proj/data/marine/test_data_kevin_test.csv')

Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: java.io.IOException: No FileSystem for scheme: s3
	at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2660)
	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2667)
	at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703)
	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2685)
	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373)
	at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
	at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:258)
	at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:229)
	at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:315)
	at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:204)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
	at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
	at org.apache.spark.api.python.PythonRDD.getPartitions(PythonRDD.scala:55)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:253)
	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:251)
	at scala.Option.getOrElse(Option.scala:121)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:251)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126)
	at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:944)
	at org.apache.spark.api.python.PythonRDD$.collectAndServe(PythonRDD.scala:166)
	at org.apache.spark.api.python.PythonRDD.collectAndServe(PythonRDD.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)


In [None]:
sc.stop()