In [1]:
pwd

u'/Users/arnavsomani/Downloads'

In [2]:
import pyspark

In [3]:
sc

<pyspark.context.SparkContext at 0x1109f4d90>

In [4]:
#df = spark.read.load('citibike.csv',format = 'csv',header = True, inferSchema = False)
df = sqlContext.read.load('citibike.csv',format = 'csv',header = True, inferSchema = False)

In [5]:
df.dtypes

[('cartodb_id', 'string'),
 ('the_geom', 'string'),
 ('tripduration', 'string'),
 ('starttime', 'string'),
 ('stoptime', 'string'),
 ('start_station_id', 'string'),
 ('start_station_name', 'string'),
 ('start_station_latitude', 'string'),
 ('start_station_longitude', 'string'),
 ('end_station_id', 'string'),
 ('end_station_name', 'string'),
 ('end_station_latitude', 'string'),
 ('end_station_longitude', 'string'),
 ('bikeid', 'string'),
 ('usertype', 'string'),
 ('birth_year', 'string'),
 ('gender', 'string')]

In [6]:
from pyspark.sql import Row
import csv

def parseCSV(idx,part):
    if idx==0:
        part.next()
    for p in csv.reader(part):
        yield Row(tripduration=int(p[2]),
                 starttime=p[3],
                 start_station_time=p[6])

In [7]:
rows = sc.textFile('citibike.csv').mapPartitionsWithIndex(parseCSV)
df= sqlContext.createDataFrame(rows)

In [8]:
df.dtypes

[('start_station_time', 'string'),
 ('starttime', 'string'),
 ('tripduration', 'bigint')]

In [9]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
import csv

def parseCSV(idx,part):
    if idx==0:
        part.next()
    for p in csv.reader(part):
        yield (int(p[2]),p[3],p[6])

In [10]:
rows = sc.textFile('citibike.csv').mapPartitionsWithIndex(parseCSV)
schema = StructType([StructField('tripduration',IntegerType()),
                    StructField('starttime',StringType()),
                    StructField('start_station_name',StringType())
                    ])

In [11]:
df = sqlContext.createDataFrame(rows, schema)
df.dtypes

[('tripduration', 'int'),
 ('starttime', 'string'),
 ('start_station_name', 'string')]

In [12]:
df.show()

+------------+--------------------+--------------------+
|tripduration|           starttime|  start_station_name|
+------------+--------------------+--------------------+
|         801|2015-02-01 00:00:...|     8 Ave & W 31 St|
|         379|2015-02-01 00:00:...|  E 17 St & Broadway|
|        2474|2015-02-01 00:01:...|Grand Army Plaza ...|
|         818|2015-02-01 00:01:...|   6 Ave & Broome St|
|         544|2015-02-01 00:01:...|Lawrence St & Wil...|
|         717|2015-02-01 00:02:...|Willoughby Ave & ...|
|        1306|2015-02-01 00:04:...|     W 56 St & 6 Ave|
|         913|2015-02-01 00:04:...|      E 4 St & 2 Ave|
|         759|2015-02-01 00:04:...|Washington Pl & B...|
|         585|2015-02-01 00:05:...|Greenwich Ave & 8...|
|         581|2015-02-01 00:05:...|Greenwich Ave & 8...|
|         204|2015-02-01 00:05:...|  Broadway & W 32 St|
|        1169|2015-02-01 00:07:...|  Carmine St & 6 Ave|
|         419|2015-02-01 00:07:...|     E 11 St & 1 Ave|
|         527|2015-02-01 00:09:

In [13]:
import pyspark.sql.functions as sf

In [14]:
df.agg (sf.approx_count_distinct(df.tripduration).alias('c')).show()

+----+
|   c|
+----+
|2537|
+----+



In [15]:
df.agg (sf.approx_count_distinct(df.start_station_name).alias('c')).show()

+---+
|  c|
+---+
|337|
+---+



In [16]:
df.agg (sf.approx_count_distinct((df.tripduration/60).cast('int')).alias('c')).show()

+---+
|  c|
+---+
|169|
+---+



In [17]:
df.registerTempTable('citibike')

In [18]:
sqlContext.sql('select percentile(tripduration, 0.5) from citibike').collect()

[Row(percentile(tripduration, CAST(0.5 AS DOUBLE))=529.0)]

In [19]:
df.rdd.take(5)

[Row(tripduration=801, starttime=u'2015-02-01 00:00:00+00', start_station_name=u'8 Ave & W 31 St'),
 Row(tripduration=379, starttime=u'2015-02-01 00:00:00+00', start_station_name=u'E 17 St & Broadway'),
 Row(tripduration=2474, starttime=u'2015-02-01 00:01:00+00', start_station_name=u'Grand Army Plaza & Central Park S'),
 Row(tripduration=818, starttime=u'2015-02-01 00:01:00+00', start_station_name=u'6 Ave & Broome St'),
 Row(tripduration=544, starttime=u'2015-02-01 00:01:00+00', start_station_name=u'Lawrence St & Willoughby St')]

In [20]:
SAT_FN = 'Sat_Results.csv'
HSD_FN = 'DOE_High_School_Directory_2014-2015.csv'

In [21]:
dfSchools = sqlContext.read.load(HSD_FN,format = 'csv',header = True, inferSchema = False) \
                        .na.drop(subset=['boro'])

dfSchools = dfSchools.filter(dfSchools['total_students']>500)
dfSchools = dfSchools.select('dbn','boro')    
dfSchools.take(2)
dfSchools.count()

176

In [22]:
dfScores = sqlContext.read.load(SAT_FN,format = 'csv',header = True, inferSchema = True)
dfScores.dtypes

[('DBN', 'string'),
 ('SCHOOL NAME', 'string'),
 ('Num of SAT Test Takers', 'string'),
 ('SAT Critical Reading Avg. Score', 'string'),
 ('SAT Math Avg. Score', 'string'),
 ('SAT Writing Avg. Score', 'string')]

In [23]:
#dfScores.select(dfScores['`SAT Math Avg. Score`'].cast('int')).show
dfScores.select(dfScores['`SAT Math Avg. Score`'].cast('int'))

DataFrame[SAT Math Avg. Score: int]

In [24]:
dfMathScores = dfScores.select('DBN',dfScores['`SAT Math Avg. Score`'].cast('int').alias('score'),
                              dfScores['`Num of SAT Test Takers`'].cast('int').alias('ntakers')
                              ).na.drop()

dfMScores = dfMathScores.select('DBN',
                                   (dfMathScores.score*dfMathScores.ntakers).alias('sum_scores'),'ntakers')




In [25]:
df = dfSchools.join(dfMScores, dfSchools.dbn == dfMScores.DBN, how='inner') \
                    .groupby('boro') \
                    .sum('sum_scores','ntakers')
        

In [26]:
df.withColumn('avg',(df[1]/df[2].cast('int'))).select('boro','avg').show()

+-------------+------------------+
|         boro|               avg|
+-------------+------------------+
|       Queens| 474.3679400475233|
|     Brooklyn|487.46256168204246|
|Staten Island| 477.9099864130435|
|    Manhattan| 514.9312780989081|
|        Bronx|  470.198606271777|
+-------------+------------------+



In [27]:
from google.transit import gtfs_realtime_pb2

In [28]:
with open('subway/gtfs-20170409T0400Z', 'rb') as fi:
    feed = gtfs_realtime_pb2.FeedMessage()
    feed.ParseFromString(fi.read())
    print feed.entity

[id: "000001"
trip_update {
  trip {
    trip_id: "000200_2..S01X017"
    start_date: "20170409"
    route_id: "2"
  }
  stop_time_update {
    departure {
      time: 1491710520
    }
    stop_id: "201S"
  }
  stop_time_update {
    arrival {
      time: 1491710610
    }
    departure {
      time: 1491710610
    }
    stop_id: "204S"
  }
  stop_time_update {
    arrival {
      time: 1491710700
    }
    departure {
      time: 1491710700
    }
    stop_id: "205S"
  }
  stop_time_update {
    arrival {
      time: 1491710760
    }
    departure {
      time: 1491710760
    }
    stop_id: "206S"
  }
  stop_time_update {
    arrival {
      time: 1491710820
    }
    departure {
      time: 1491710820
    }
    stop_id: "207S"
  }
  stop_time_update {
    arrival {
      time: 1491710910
    }
    departure {
      time: 1491710910
    }
    stop_id: "208S"
  }
  stop_time_update {
    arrival {
      time: 1491711000
    }
    departure {
      time: 1491711000
    }
    stop_id: "209

In [29]:
for entity in feed.entity:
    if entity.HasField('trip_update'):
        tu = entity.trip_update
        print (tu.trip.route_id, tu.stop_time_update[0].stop_id)
        break

(u'2', u'201S')


In [30]:
def getStops(_, part):
    feed = gtfs_realtime_pb2.FeedMessage()
    for fn,contents in part:
        feed.ParseFromString(contents)
        for entity in feed.entity:
            if entity.HasField('trip_update'):
                tu = entity.trip_update
                yield (tu.trip.route_id, tu.stop_time_update[0].stop_id)
                break

In [31]:
rdd = sc.binaryFiles('subway') \
        .mapPartitionsWithIndex(getStops).map(lambda x: (x,1)).reduceByKey(lambda x,y: x+y)

In [32]:
sorted(rdd.collect(), key=lambda x: -x[1])

[((u'2', u'201S'), 6),
 ((u'SI', u'S09'), 6),
 ((u'N', u'N10'), 5),
 ((u'L', u'L28S'), 3),
 ((u'D', u'A15'), 3),
 ((u'2', u'245N'), 2),
 ((u'L', u'L25S'), 2),
 ((u'N', u'R25'), 2),
 ((u'D', u'N10'), 2),
 ((u'SI', u'S31'), 2),
 ((u'N', u'R24'), 1),
 ((u'D', u'A24'), 1),
 ((u'L', u'L27S'), 1),
 ((u'D', u'A14'), 1),
 ((u'L', u'L26S'), 1),
 ((u'L', u'L24S'), 1)]

In [33]:
pwd

u'/Users/arnavsomani/Downloads'

In [34]:
dfTweets = sqlContext.read.load('twitter_1k.jsonl.txt', format='json')

In [35]:
dfTweets.printSchema()

root
 |-- contributors: string (nullable = true)
 |-- coordinates: struct (nullable = true)
 |    |-- coordinates: array (nullable = true)
 |    |    |-- element: double (containsNull = true)
 |    |-- type: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- entities: struct (nullable = true)
 |    |-- hashtags: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- indices: array (nullable = true)
 |    |    |    |    |-- element: long (containsNull = true)
 |    |    |    |-- text: string (nullable = true)
 |    |-- media: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- display_url: string (nullable = true)
 |    |    |    |-- expanded_url: string (nullable = true)
 |    |    |    |-- id: long (nullable = true)
 |    |    |    |-- id_str: string (nullable = true)
 |    |    |    |-- indices: array (nullable = true)
 |    |    |    |    |-- element: long (containsNull = true

In [36]:
hashtags = dfTweets.select('entities.hashtags.text')
hashtags = hashtags.filter(sf.size('text')>0)
hashtags = hashtags.select(sf.explode('text').alias('hashtag'))
hashtags = hashtags.groupBy('hashtag').count().orderBy('count', ascending=False)
hashtags.show()

+------------------+-----+
|           hashtag|count|
+------------------+-----+
|USSaluteouttuesday|   33|
|               NYC|    5|
| privatepassSports|    4|
|               nyc|    4|
|         superbowl|    3|
|       RobinThicke|    3|
|              espn|    3|
|               Job|    3|
|   SuperBowlXLVIII|    3|
|         redcarpet|    3|
|      ESPNTHEPARTY|    2|
|             ncaaw|    2|
|              Jobs|    2|
|             facts|    2|
|               Raw|    2|
|           NewYork|    2|
|              soho|    2|
|    henleyvaporium|    2|
|                IT|    2|
|         SuperBowl|    2|
+------------------+-----+
only showing top 20 rows

