In [1]:
sc

<pyspark.context.SparkContext at 0x111b3dd90>

In [2]:
df = spark.read.load('citibike.csv', format='csv', header=True, inferScheme=False)

In [3]:
df.dtypes[:5]

[('cartodb_id', 'string'),
 ('the_geom', 'string'),
 ('tripduration', 'string'),
 ('starttime', 'string'),
 ('stoptime', 'string')]

In [4]:
from pyspark.sql import Row
import csv

def parseCSV(idx, part):
    if idx == 0:
        part.next()
    for p in csv.reader(part):
        yield Row(tripduration=int(p[2]),
                  startingtime=p[3],
                   start_station_time=p[6])
                  
rows = sc.textFile('citibike.csv').mapPartitionsWithIndex(parseCSV)
df = sqlContext.createDataFrame(rows)

In [5]:
from pyspark.sql import Row
from pyspark.sql.types import *
import csv

def parseCSV(idx, part):
    if idx == 0:
        part.next()
    for p in csv.reader(part):
        yield (int(p[2]), p[3], p[6])
                  
rows = sc.textFile('citibike.csv').mapPartitionsWithIndex(parseCSV)
schema = StructType([StructField('tripduration', IntegerType()),
                    StructField('starttime', StringType()),
                    StructField('start_station_time', StringType())])
df = sqlContext.createDataFrame(rows, schema)
df.dtypes

[('tripduration', 'int'),
 ('starttime', 'string'),
 ('start_station_time', 'string')]

In [6]:
import pyspark.sql.functions as sf

df.agg(sf.approx_count_distinct(df.tripduration).alias('c')).show()

+----+
|   c|
+----+
|2537|
+----+



In [7]:
df.registerTempTable('citibike')

In [8]:
sqlContext.sql('select * from citibike order by tripduration limit 5').collect()

[Row(tripduration=60, starttime=u'2015-02-02 21:39:00+00', start_station_time=u'W 31 St & 7 Ave'),
 Row(tripduration=60, starttime=u'2015-02-04 07:52:00+00', start_station_time=u'W 29 St & 9 Ave'),
 Row(tripduration=60, starttime=u'2015-02-03 08:07:00+00', start_station_time=u'Mott St & Prince St'),
 Row(tripduration=60, starttime=u'2015-02-02 17:14:00+00', start_station_time=u'5 Ave & E 29 St'),
 Row(tripduration=60, starttime=u'2015-02-03 18:01:00+00', start_station_time=u'Grand St & Havemeyer St')]

In [9]:
sqlContext.sql('select percentile(tripduration, 0.5) from citibike').collect()

[Row(percentile(tripduration, CAST(0.5 AS DOUBLE))=529.0)]

In [10]:
SAT_FN = 'SAT_Results.csv'
HSD_FN = 'DOE_High_School_Directory_2014-2015.csv'

In [11]:
dfSchools = spark.read.load(HSD_FN, format='csv', header=True, inferSchema=True) \
                      .na.drop(subset=['boro'])
    
dfSchools = dfSchools.filter(dfSchools['total_students']>500)
dfSchools = dfSchools.select('dbn','boro')
dfSchools.count()

176

In [12]:
dfScores = sqlContext.read.load(SAT_FN, format='csv', header=True, inferSchema=True)
dfScores.select(dfScores['`SAT Math Avg. Score`'].cast('int'))


DataFrame[SAT Math Avg. Score: int]

In [13]:
dfScores.dtypes

[('DBN', 'string'),
 ('SCHOOL NAME', 'string'),
 ('Num of SAT Test Takers', 'string'),
 ('SAT Critical Reading Avg. Score', 'string'),
 ('SAT Math Avg. Score', 'string'),
 ('SAT Writing Avg. Score', 'string')]

In [14]:
dfMScores = dfScores.select('DBN',
                           dfScores['`SAT Math Avg. Score`'].cast('int').alias('score'),
                           dfScores['Num of SAT Test Takers'].cast('int').alias('ntakers')) \
                            .na.drop()
        
dfMScores = dfMScores.select('DBN',
                            (dfMScores.score*dfMScores.ntakers).alias('sum_score'),
                            'ntakers')
        
dfMScores.show()

+------+---------+-------+
|   DBN|sum_score|ntakers|
+------+---------+-------+
|02M047|     6400|     16|
|21K410|   207575|    475|
|30Q301|    43120|     98|
|17K382|    22066|     59|
|18K637|    13335|     35|
|32K403|    18300|     50|
|09X365|    18306|     54|
|11X270|    22064|     56|
|05M367|    12078|     33|
|14K404|    24276|     68|
|30Q575|    66420|    135|
|13K336|     3366|      9|
|04M635|    17712|     48|
|24Q264|    40406|     89|
|17K408|    19494|     57|
|19K618|    22260|     60|
|27Q309|    13644|     36|
|32K552|    24388|     67|
|13K499|    26208|     72|
|07X600|    30400|     76|
+------+---------+-------+
only showing top 20 rows



In [15]:
df = dfSchools.join(dfMScores, dfSchools.dbn==dfMScores.DBN, how='inner')
df = df.groupBy('boro').sum('sum_score','ntakers')
df.show()

+-------------+--------------+------------+
|         boro|sum(sum_score)|sum(ntakers)|
+-------------+--------------+------------+
|       Queens|       5190534|       10942|
|     Brooklyn|       4544126|        9322|
|Staten Island|       1406967|        2944|
|    Manhattan|       3206992|        6228|
|        Bronx|       1619364|        3444|
+-------------+--------------+------------+



In [16]:
df.withColumn('avg',(df[1]/df[2]).cast('int')).select('boro','avg').show()

+-------------+---+
|         boro|avg|
+-------------+---+
|       Queens|474|
|     Brooklyn|487|
|Staten Island|477|
|    Manhattan|514|
|        Bronx|470|
+-------------+---+



In [17]:
df.rdd

MapPartitionsRDD[88] at javaToPython at NativeMethodAccessorImpl.java:0

In [19]:
dfTweets = sqlContext.read.load('twitter_1k.jsonl',format='json')

In [26]:
hashtags = dfTweets.select('entities.hashtags.text')
hashtags = hashtags.filter(sf.size('text')>0)
hashtags = hashtags.select(sf.explode('text').alias('hashtag'))
hashtags = hashtags.groupBy('hashtag').count().orderBy('count',ascending=False)
hashtags.show()

+------------------+-----+
|           hashtag|count|
+------------------+-----+
|USSaluteouttuesday|   33|
|               NYC|    5|
| privatepassSports|    4|
|               nyc|    4|
|         superbowl|    3|
|         redcarpet|    3|
|               Job|    3|
|   SuperBowlXLVIII|    3|
|              espn|    3|
|       RobinThicke|    3|
|    henleyvaporium|    2|
|             ncaaw|    2|
|      ESPNTHEPARTY|    2|
|             facts|    2|
|         SuperBowl|    2|
|           NewYork|    2|
|              soho|    2|
|                IT|    2|
|               Raw|    2|
|              Jobs|    2|
+------------------+-----+
only showing top 20 rows

