In [1]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark.sql import SQLContext
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
#Feel free to add if you want.
from pyspark.ml.feature import StringIndexer
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.classification import LogisticRegression

In [20]:
weather= spark.read.format('com.mongodb.spark.sql.DefaultSource').option('uri','mongodb://54.186.104.247/bikeshare.weather').load()
trip= spark.read.format('com.mongodb.spark.sql.DefaultSource').option('uri','mongodb://54.186.104.247/bikeshare.trip').load()
station= spark.read.format('com.mongodb.spark.sql.DefaultSource').option('uri','mongodb://54.186.104.247/bikeshare.station').load()
status= spark.read.format('com.mongodb.spark.sql.DefaultSource').option('uri','mongodb://54.186.104.247/bikeshare.status').load()

In [72]:
table = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('/Users/ty/Downloads/bikeDF2.csv')

## 1. Preprocessing - Flatten DataFrame

In [24]:
station=station.drop('_id')
station.printSchema()

root
 |-- city: string (nullable = true)
 |-- dock_count: integer (nullable = true)
 |-- installation_date: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)
 |-- name: string (nullable = true)
 |-- station_id: integer (nullable = true)



In [25]:
status=status.drop('_id')
status.printSchema()

root
 |-- bikes_available: integer (nullable = true)
 |-- docks_available: integer (nullable = true)
 |-- station_id: integer (nullable = true)
 |-- time: string (nullable = true)



In [27]:
weather=weather.drop('_id')
weather.printSchema()

root
 |-- cloud_cover: double (nullable = true)
 |-- date: string (nullable = true)
 |-- events: string (nullable = true)
 |-- max_dew_point_f: string (nullable = true)
 |-- max_gust_speed_mph: string (nullable = true)
 |-- max_humidity: string (nullable = true)
 |-- max_sea_level_pressure_inches: double (nullable = true)
 |-- max_temperature_f: double (nullable = true)
 |-- max_visibility_miles: string (nullable = true)
 |-- max_wind_Speed_mph: double (nullable = true)
 |-- mean_dew_point_f: string (nullable = true)
 |-- mean_humidity: string (nullable = true)
 |-- mean_sea_level_pressure_inches: double (nullable = true)
 |-- mean_temperature_f: double (nullable = true)
 |-- mean_visibility_miles: string (nullable = true)
 |-- mean_wind_speed_mph: double (nullable = true)
 |-- min_dew_point_f: string (nullable = true)
 |-- min_humidity: string (nullable = true)
 |-- min_sea_level_pressure_inches: double (nullable = true)
 |-- min_temperature_f: double (nullable = true)
 |-- min_visibi

In [29]:
trip=trip.drop('_id')
trip.printSchema()

root
 |-- bike_id: integer (nullable = true)
 |-- duration: integer (nullable = true)
 |-- end_date: string (nullable = true)
 |-- end_station_id: integer (nullable = true)
 |-- end_station_name: string (nullable = true)
 |-- start_date: string (nullable = true)
 |-- start_station_id: integer (nullable = true)
 |-- start_station_name: string (nullable = true)
 |-- subscription_type: string (nullable = true)
 |-- trip_id: integer (nullable = true)
 |-- zip_code: string (nullable = true)



## 2. Preprocessing - Change Schema

In [34]:
tripSchema = StructType([
  StructField("bike_id", IntegerType(), True),
  StructField("duration", IntegerType(), True),
  StructField("end_date", TimestampType(), True),
  StructField("end_station_id", IntegerType(), True),
#  StructField("end_station_name", StringType(), True),
  StructField("start_date", TimestampType(), True),
  StructField("start_station_id", IntegerType(), True),
#  StructField("start_station_name", StringType(), True),
  StructField("subscription_type", StringType(), True),
  StructField("trip_id", IntegerType(), False),
  StructField("zip_code", IntegerType(), True)])

In [42]:
new_trip = sqlContext.createDataFrame(trip.select('bike_id','duration','end_date','end_station_id','start_date','start_station_id','subscription_type','trip_id','zip_code').rdd,tripSchema)

In [49]:
new_trip_rdd = trip.select('bike_id','duration','end_date','end_station_id','start_date','start_station_id','subscription_type','trip_id','zip_code').rdd
new_trip_rdd.top(2)

[Row(bike_id=878, duration=49066, end_date=u'7/10/2014 7:28', end_station_id=61, start_date=u'7/9/2014 17:50', start_station_id=51, subscription_type=u'Customer', trip_id=357007, zip_code=u'nil'),
 Row(bike_id=878, duration=29137, end_date=u'12/11/2014 20:32', end_station_id=77, start_date=u'12/11/2014 12:26', start_station_id=39, subscription_type=u'Customer', trip_id=574216, zip_code=u'2143')]

In [None]:
new_trip = sqlContext.createDataFrame(new_trip_rdd,tripSchema)

In [48]:
new_trip.printSchema()

root
 |-- bike_id: integer (nullable = true)
 |-- duration: integer (nullable = true)
 |-- end_date: timestamp (nullable = true)
 |-- end_station_id: integer (nullable = true)
 |-- end_station_name: string (nullable = true)
 |-- start_date: timestamp (nullable = true)
 |-- start_station_id: integer (nullable = true)
 |-- start_station_name: string (nullable = true)
 |-- subscription_type: string (nullable = true)
 |-- trip_id: integer (nullable = false)
 |-- zip_code: integer (nullable = true)



## Above: Need to split and add "date","hour"

In [37]:
stationSchema=StructType([
  StructField("city", StringType(), True),
  StructField("dock_count", IntegerType(), True),
#  StructField("installation_date", DateType(), True),
#  StructField("lat", DoubleType(), True),
#  StructField("long", DoubleType(), True),
#  StructField("name", StringType(), True),
  StructField("station_id", IntegerType(), True)])

## Above: trip.join(station) + "start_station_city","end_station_city"

In [37]:
weatherSchema=StructType([
  StructField("cloud_cover", DoubleType(), True),
  StructField("date", DateType(), True),
  StructField("events", StringType(), True),
  StructField("max_dew_point_f", DoubleType(), True),
  StructField("max_gust_speed_mph", DoubleType(), True),
  StructField("max_humidity", DoubleType(), True),
  StructField("max_sea_level_pressure_inches", DoubleType(), True),
  StructField("max_temperature_f", DoubleType(), True),
  StructField("max_visibility_miles", DoubleType(), True),
  StructField("max_wind_Speed_mph", DoubleType(), True),
  StructField("mean_dew_point_f", DoubleType(), True),
  StructField("mean_humidity", DoubleType(), True),
  StructField("mean_sea_level_pressure_inches", DoubleType(), True),
  StructField("mean_temperature_f", DoubleType(), True),
  StructField("mean_visibility_miles", DoubleType(), True),
  StructField("mean_wind_speed_mph", DoubleType(), True),
  StructField("min_dew_point_f", DoubleType(), True),
  StructField("min_humidity", DoubleType(), True),
  StructField("min_sea_level_pressure_inches", DoubleType(), True),
  StructField("min_temperature_f", DoubleType(), True),
  StructField("min_visibility_miles", DoubleType(), True),
  StructField("precipitation_inches", DoubleType(), True),
  StructField("wind_dir_degrees", DoubleType(), True),
  StructField("zip_code", IntegerType(), True)])

## Above: Using date,zip_code to join weather and trip

In [None]:
tripSchema = StructType([
  StructField("bikes_available", IntegerType(), True),
  StructField("docks_available", IntegerType(), True),
  StructField("station_id", IntegerType(), True),
  StructField("time", TimestampType(), True)])

## Above: use all first and observe

In [54]:
weather.groupBy(weather['zip_code']).count().show()

+--------+-----+
|zip_code|count|
+--------+-----+
|   94041|  733|
|   94107|  733|
|   94063|  733|
|   95113|  733|
|   94301|  733|
+--------+-----+



In [66]:
trip.groupBy(trip['zip_code']).count().sort('count',ascending=False).show(1000)

+----------+-----+
|  zip_code|count|
+----------+-----+
|     94107|78704|
|     94105|42672|
|     94133|31359|
|     94103|26673|
|     94111|21409|
|     94102|19757|
|     94109|13989|
|     95112|11564|
|       nil|10682|
|     94117| 9851|
|     94158| 9180|
|     94611| 8026|
|     94110| 7621|
|     94403| 7168|
|     94108| 7149|
|     94040| 7114|
|     94602| 6816|
|     94114| 6707|
|          | 6619|
|     94025| 6439|
|     94501| 6419|
|     94610| 6301|
|     95110| 6054|
|     94010| 6039|
|     94041| 5867|
|     94404| 5323|
|     94070| 5229|
|     95113| 4811|
|     94122| 4802|
|     94608| 4714|
|     94402| 4633|
|     94002| 4630|
|     94062| 4266|
|     94609| 3952|
|     94301| 3868|
|     94306| 3817|
|     94115| 3802|
|     94112| 3771|
|     94087| 3698|
|     94303| 3512|
|     94061| 3490|
|     94118| 3379|
|     94401| 3374|
|     94131| 3367|
|     94043| 3277|
|     94618| 3225|
|     94703| 3222|
|     94086| 3108|
|     94903| 3068|
|     95014|

In [68]:
new_trip_rdd = new_trip_rdd.map(lambda x: x)
new_trip_rdd.top(5)

[Row(bike_id=878, duration=49066, end_date=u'7/10/2014 7:28', end_station_id=61, start_date=u'7/9/2014 17:50', start_station_id=51, subscription_type=u'Customer', trip_id=357007, zip_code=u'nil'),
 Row(bike_id=878, duration=29137, end_date=u'12/11/2014 20:32', end_station_id=77, start_date=u'12/11/2014 12:26', start_station_id=39, subscription_type=u'Customer', trip_id=574216, zip_code=u'2143'),
 Row(bike_id=878, duration=23783, end_date=u'9/1/2014 16:05', end_station_id=71, start_date=u'9/1/2014 9:29', start_station_id=76, subscription_type=u'Customer', trip_id=433001, zip_code=u'31100'),
 Row(bike_id=878, duration=23518, end_date=u'12/7/2014 18:18', end_station_id=75, start_date=u'12/7/2014 11:46', start_station_id=75, subscription_type=u'Customer', trip_id=568648, zip_code=u'94118'),
 Row(bike_id=878, duration=22545, end_date=u'5/2/2015 18:43', end_station_id=39, start_date=u'5/2/2015 12:28', start_station_id=39, subscription_type=u'Customer', trip_id=750357, zip_code=u'94102')]

In [73]:
table.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- zip_code: integer (nullable = true)
 |-- start_station_name: string (nullable = true)
 |-- num_trips: integer (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- max_temperature_f: double (nullable = true)
 |-- mean_temperature_f: double (nullable = true)
 |-- min_temperature_f: double (nullable = true)
 |-- max_dew_point_f: double (nullable = true)
 |-- mean_dew_point_f: double (nullable = true)
 |-- min_dew_point_f: double (nullable = true)
 |-- max_humidity: double (nullable = true)
 |-- mean_humidity: double (nullable = true)
 |-- min_humidity: double (nullable = true)
 |-- max_sea_level_pressure_inches: double (nullable = true)
 |-- mean_sea_level_pressure_inches: double (nullable = true)
 |-- min_sea_level_pressure_inches: double (nullable = true)
 |-- max_visibility_miles: double (nullable = true)
 |-- mean_visibility_miles: double (nullable = true)
 |-- min_visibility_miles: double (nullable = true)
 |-- max_wind_Speed_mph

In [109]:
table.where(table['day_of_week'].isNull()).count()

0