# Spark data frames from CSV files: handling headers & column types

Start pyspark with IPython notebook with ``IPYTHON_OPTS="notebook" $SPARK_HOME/bin/pyspark`` from the command prompt

In [1]:
from pyspark.sql import SQLContext
from pyspark.sql.types import *  
sqlContext = SQLContext(sc)

Change the path in the command below to reflect the directory where you have saved the file ``nyctaxisub.csv``

In [2]:
taxiFile = sc.textFile("file:///home/ctsats/datasets/BDU_Spark/nyctaxisub.csv")
taxiFile.count()

250000

In [3]:
taxiFile.take(2)

[u'"_id","_rev","dropoff_datetime","dropoff_latitude","dropoff_longitude","hack_license","medallion","passenger_count","pickup_datetime","pickup_latitude","pickup_longitude","rate_code","store_and_fwd_flag","trip_distance","trip_time_in_secs","vendor_id"',
 u'"29b3f4a30dea6688d4c289c9672cb996","1-ddfdec8050c7ef4dc694eeeda6c4625e","2013-01-11 22:03:00",+4.07033460000000E+001,-7.40144200000000E+001,"A93D1F7F8998FFB75EEF477EB6077516","68BC16A99E915E44ADA7E639B4DD5F59",2,"2013-01-11 21:48:00",+4.06760670000000E+001,-7.39810790000000E+001,1,,+4.08000000000000E+000,900,"VTS"']

Let's isolate the header, in order to eventually use it to get the field names:

In [4]:
header = taxiFile.first()
header

u'"_id","_rev","dropoff_datetime","dropoff_latitude","dropoff_longitude","hack_license","medallion","passenger_count","pickup_datetime","pickup_latitude","pickup_longitude","rate_code","store_and_fwd_flag","trip_distance","trip_time_in_secs","vendor_id"'

We want to get rid of these double quotes around the field names, and then use the header to build the fields for our schema:

In [5]:
schemaString = header.replace('"','')
fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split(',')]
fields

[StructField(_id,StringType,true),
 StructField(_rev,StringType,true),
 StructField(dropoff_datetime,StringType,true),
 StructField(dropoff_latitude,StringType,true),
 StructField(dropoff_longitude,StringType,true),
 StructField(hack_license,StringType,true),
 StructField(medallion,StringType,true),
 StructField(passenger_count,StringType,true),
 StructField(pickup_datetime,StringType,true),
 StructField(pickup_latitude,StringType,true),
 StructField(pickup_longitude,StringType,true),
 StructField(rate_code,StringType,true),
 StructField(store_and_fwd_flag,StringType,true),
 StructField(trip_distance,StringType,true),
 StructField(trip_time_in_secs,StringType,true),
 StructField(vendor_id,StringType,true)]

How many elements there are in the header (i.e. how many data columns)?

In [6]:
len(fields)

16

OK, now let's modify the fields which should not be of type ``String``:

In [7]:
fields[2].dataType = TimestampType()
fields[3].dataType = FloatType()
fields[4].dataType = FloatType()
fields[7].dataType = IntegerType()
fields[8].dataType = TimestampType()
fields[9].dataType = FloatType()
fields[10].dataType = FloatType()
fields[11].dataType = IntegerType()
fields[13].dataType = FloatType()
fields[14].dataType = IntegerType()
fields

[StructField(_id,StringType,true),
 StructField(_rev,StringType,true),
 StructField(dropoff_datetime,TimestampType,true),
 StructField(dropoff_latitude,FloatType,true),
 StructField(dropoff_longitude,FloatType,true),
 StructField(hack_license,StringType,true),
 StructField(medallion,StringType,true),
 StructField(passenger_count,IntegerType,true),
 StructField(pickup_datetime,TimestampType,true),
 StructField(pickup_latitude,FloatType,true),
 StructField(pickup_longitude,FloatType,true),
 StructField(rate_code,IntegerType,true),
 StructField(store_and_fwd_flag,StringType,true),
 StructField(trip_distance,FloatType,true),
 StructField(trip_time_in_secs,IntegerType,true),
 StructField(vendor_id,StringType,true)]

Let's also get rid of the leading underscores in the first two field names (``_id`` and ``_rev``):

In [8]:
fields[0].name = 'id'
fields[1].name = 'rev'
fields

[StructField(id,StringType,true),
 StructField(rev,StringType,true),
 StructField(dropoff_datetime,TimestampType,true),
 StructField(dropoff_latitude,FloatType,true),
 StructField(dropoff_longitude,FloatType,true),
 StructField(hack_license,StringType,true),
 StructField(medallion,StringType,true),
 StructField(passenger_count,IntegerType,true),
 StructField(pickup_datetime,TimestampType,true),
 StructField(pickup_latitude,FloatType,true),
 StructField(pickup_longitude,FloatType,true),
 StructField(rate_code,IntegerType,true),
 StructField(store_and_fwd_flag,StringType,true),
 StructField(trip_distance,FloatType,true),
 StructField(trip_time_in_secs,IntegerType,true),
 StructField(vendor_id,StringType,true)]

Now that we are satisfied with the data types, we can construct our schema, which we will use later below for building the dataframe:

In [9]:
schema = StructType(fields)
schema

StructType(List(StructField(id,StringType,true),StructField(rev,StringType,true),StructField(dropoff_datetime,TimestampType,true),StructField(dropoff_latitude,FloatType,true),StructField(dropoff_longitude,FloatType,true),StructField(hack_license,StringType,true),StructField(medallion,StringType,true),StructField(passenger_count,IntegerType,true),StructField(pickup_datetime,TimestampType,true),StructField(pickup_latitude,FloatType,true),StructField(pickup_longitude,FloatType,true),StructField(rate_code,IntegerType,true),StructField(store_and_fwd_flag,StringType,true),StructField(trip_distance,FloatType,true),StructField(trip_time_in_secs,IntegerType,true),StructField(vendor_id,StringType,true)))

Isolate the header and drop it off the actual data:

In [10]:
taxiHeader = taxiFile.filter(lambda l: "_id" in l)
taxiHeader.collect()

[u'"_id","_rev","dropoff_datetime","dropoff_latitude","dropoff_longitude","hack_license","medallion","passenger_count","pickup_datetime","pickup_latitude","pickup_longitude","rate_code","store_and_fwd_flag","trip_distance","trip_time_in_secs","vendor_id"']

In [11]:
taxiNoHeader = taxiFile.subtract(taxiHeader)
taxiNoHeader.count()

249999

We end up with 249,999 rows, as expected.

Before parsing the data, we have to import the necessary Python modules to handle ``datetimes``:

In [12]:
from datetime import *
from dateutil.parser import parse
# test it:
parse("2013-02-09 18:16:10")

datetime.datetime(2013, 2, 9, 18, 16, 10)

We are now ready for our first attempt to parse the data with the correct types. We build a temporary RDD for this purpose - ``taxi_temp``:

In [13]:
taxi_temp = taxiNoHeader.map(lambda k: k.split(",")).map(lambda p: (p[0], p[1], parse(p[2].strip('"')), float(p[3]), float(p[4]) , p[5], p[6] , int(p[7]), parse(p[8].strip('"')), float(p[9]), float(p[10]), int(p[11]), p[12], float(p[13]), int(p[14]), p[15] ))
taxi_temp.top(2)

[(u'"fff43e5eb5662eecf42a3f9b5ff42214"',
  u'"1-2e9ea2f49a29663d699d1940f42fab66"',
  datetime.datetime(2013, 11, 26, 13, 15),
  40.764915,
  -73.982536,
  u'"564F38A1BC4B1AA7EC528E6C2C81EAAC"',
  u'"3E29713986A6762D985C4FC53B177F61"',
  1,
  datetime.datetime(2013, 11, 26, 13, 2),
  40.786667,
  -73.972023,
  1,
  u'',
  1.87,
  780,
  u'"VTS"'),
 (u'"fff43e5eb5662eecf42a3f9b5ff1fc5b"',
  u'"1-18b010dab3a3f83ebf4b9f31e88c615d"',
  datetime.datetime(2013, 11, 26, 3, 59),
  40.686081,
  -73.952072,
  u'"5E3208C5FA0E44EA08223489E3853EAD"',
  u'"DC67FC4851D7642EDCA34A8A3C44F116"',
  1,
  datetime.datetime(2013, 11, 26, 3, 42),
  40.740715,
  -74.004562,
  1,
  u'',
  5.84,
  1020,
  u'"VTS"')]

Finally, let's build our dataframe, using the ``taxi_temp`` RDD just produced and the ``schema`` variable computed above:

In [14]:
taxi_df = sqlContext.createDataFrame(taxi_temp, schema)
taxi_df.head(2)

[Row(id=u'"e6b3fa7bee24a30c25ce87e44e714457"', rev=u'"1-9313152f4894bb47678d8ce98e9ec733"', dropoff_datetime=datetime.datetime(2013, 2, 9, 18, 16), dropoff_latitude=40.73524856567383, dropoff_longitude=-73.99406433105469, hack_license=u'"88F8DD623E5090083988CD32C84973E3"', medallion=u'"6B96DDFB5A50B96E72F5808ABE778B17"', passenger_count=1, pickup_datetime=datetime.datetime(2013, 2, 9, 17, 59), pickup_latitude=40.775123596191406, pickup_longitude=-73.96345520019531, rate_code=1, store_and_fwd_flag=u'', trip_distance=3.4600000381469727, trip_time_in_secs=1020, vendor_id=u'"VTS"'),
 Row(id=u'"cbee283a4613f85af67f79c6d7721234"', rev=u'"1-c1bd2aecbf3936b30c486aa3deade97b"', dropoff_datetime=datetime.datetime(2013, 1, 11, 17, 2), dropoff_latitude=40.826969146728516, dropoff_longitude=-73.94998931884766, hack_license=u'"5514E59A5CEA0379EA6F7F12ABE87489"', medallion=u'"3541D0677EEEA07B67E645E12F04F517"', passenger_count=1, pickup_datetime=datetime.datetime(2013, 1, 11, 16, 29), pickup_latitude

We see that we still have quotes-within-quotes in our ``StringType`` variables. We make a second attempt, this time using Spark's ``rdd.toDF()`` method, in order to build the dataframe directly from ``taxiNoHeader`` RDD, without invoking the temporary ``taxi_temp`` RDD:

In [15]:
taxi_df = taxiNoHeader.map(lambda k: k.split(",")).map(lambda p: (p[0].strip('"'), p[1].strip('"'), parse(p[2].strip('"')), float(p[3]), float(p[4]) , p[5].strip('"'), p[6].strip('"') , int(p[7]), parse(p[8].strip('"')), float(p[9]), float(p[10]), int(p[11]), p[12].strip('"'), float(p[13]), int(p[14]), p[15].strip('"')) ).toDF(schema)
taxi_df.head(2)

[Row(id=u'e6b3fa7bee24a30c25ce87e44e714457', rev=u'1-9313152f4894bb47678d8ce98e9ec733', dropoff_datetime=datetime.datetime(2013, 2, 9, 18, 16), dropoff_latitude=40.73524856567383, dropoff_longitude=-73.99406433105469, hack_license=u'88F8DD623E5090083988CD32C84973E3', medallion=u'6B96DDFB5A50B96E72F5808ABE778B17', passenger_count=1, pickup_datetime=datetime.datetime(2013, 2, 9, 17, 59), pickup_latitude=40.775123596191406, pickup_longitude=-73.96345520019531, rate_code=1, store_and_fwd_flag=u'', trip_distance=3.4600000381469727, trip_time_in_secs=1020, vendor_id=u'VTS'),
 Row(id=u'cbee283a4613f85af67f79c6d7721234', rev=u'1-c1bd2aecbf3936b30c486aa3deade97b', dropoff_datetime=datetime.datetime(2013, 1, 11, 17, 2), dropoff_latitude=40.826969146728516, dropoff_longitude=-73.94998931884766, hack_license=u'5514E59A5CEA0379EA6F7F12ABE87489', medallion=u'3541D0677EEEA07B67E645E12F04F517', passenger_count=1, pickup_datetime=datetime.datetime(2013, 1, 11, 16, 29), pickup_latitude=40.77362823486328

Let's run some simple pandas-like queries. How many records per vendor are there in the dataset?

In [16]:
taxi_df.groupBy("vendor_id").count().show()

vendor_id count 
CMT       114387
VTS       135612


Recall that we have missing values in the field ``store_and_fwd_flag``. How many are they?

In [17]:
taxi_df.filter(taxi_df.store_and_fwd_flag == '').count()

135616L

OK, the number of missing values is dangerously close to the number of ``VTS`` vendor records. Is this a coincidence, or vendor ``VTS`` indeed tends not to log the subject variable?

In [18]:
taxi_df.filter(taxi_df.store_and_fwd_flag == '' and taxi_df.vendor_id == 'VTS').count()

135612L

Well, we have a finding! Indeed, all records coming from ``VTS`` vendor have missing value in the subject field...

``dtypes`` and ``printSchema()`` methods can be used to get information about the schema:

In [19]:
taxi_df.dtypes

[('id', 'string'),
 ('rev', 'string'),
 ('dropoff_datetime', 'timestamp'),
 ('dropoff_latitude', 'float'),
 ('dropoff_longitude', 'float'),
 ('hack_license', 'string'),
 ('medallion', 'string'),
 ('passenger_count', 'int'),
 ('pickup_datetime', 'timestamp'),
 ('pickup_latitude', 'float'),
 ('pickup_longitude', 'float'),
 ('rate_code', 'int'),
 ('store_and_fwd_flag', 'string'),
 ('trip_distance', 'float'),
 ('trip_time_in_secs', 'int'),
 ('vendor_id', 'string')]

In [20]:
taxi_df.printSchema()

root
 |-- id: string (nullable = true)
 |-- rev: string (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- dropoff_latitude: float (nullable = true)
 |-- dropoff_longitude: float (nullable = true)
 |-- hack_license: string (nullable = true)
 |-- medallion: string (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- pickup_latitude: float (nullable = true)
 |-- pickup_longitude: float (nullable = true)
 |-- rate_code: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- trip_distance: float (nullable = true)
 |-- trip_time_in_secs: integer (nullable = true)
 |-- vendor_id: string (nullable = true)



We can run the SQL equivalent of the above pandas-like queries. First, we have to register the dataframe as a named temporary table, let's say ``taxi``:

In [21]:
taxi_df.registerTempTable("taxi")

In [22]:
sqlContext.sql("SELECT vendor_id, COUNT(*) FROM taxi GROUP BY vendor_id ").show()

vendor_id c1    
CMT       114387
VTS       135612


In [23]:
sqlContext.sql("SELECT COUNT(*) FROM taxi WHERE store_and_fwd_flag = '' ").show()

c0    
135616


In [24]:
sqlContext.sql("SELECT COUNT(*) FROM taxi WHERE vendor_id = 'VTS' AND store_and_fwd_flag = '' ").show()

c0    
135612


Notice that, unlike standard SQL, table and column names are case sensitive, i.e. ``TAXI`` or ``vendor_ID`` in the queries will produce an error.

Let's change some column names to shorter versions:

In [25]:
taxi_df = taxi_df.withColumnRenamed('dropoff_longitude', 'dropoff_long').withColumnRenamed('dropoff_latitude', 'dropoff_lat').withColumnRenamed('pickup_latitude', 'pickup_lat').withColumnRenamed('pickup_longitude', 'pickup_long')

In [26]:
taxi_df.dtypes

[('id', 'string'),
 ('rev', 'string'),
 ('dropoff_datetime', 'timestamp'),
 ('dropoff_lat', 'float'),
 ('dropoff_long', 'float'),
 ('hack_license', 'string'),
 ('medallion', 'string'),
 ('passenger_count', 'int'),
 ('pickup_datetime', 'timestamp'),
 ('pickup_lat', 'float'),
 ('pickup_long', 'float'),
 ('rate_code', 'int'),
 ('store_and_fwd_flag', 'string'),
 ('trip_distance', 'float'),
 ('trip_time_in_secs', 'int'),
 ('vendor_id', 'string')]

Finally, let's make a row selection and store the results to a pandas dataframe:

In [27]:
import pandas as pd
taxi_CMT = taxi_df.filter("vendor_id = 'CMT' and store_and_fwd_flag != '' ").toPandas()

In [28]:
taxi_CMT.head()

Unnamed: 0,id,rev,dropoff_datetime,dropoff_lat,dropoff_long,hack_license,medallion,passenger_count,pickup_datetime,pickup_lat,pickup_long,rate_code,store_and_fwd_flag,trip_distance,trip_time_in_secs,vendor_id
0,e4fb64b76eb99d4ac222713eb36f1afb,1-233ff643b7f105b7a76ec05cf4f0f6db,2013-11-26 11:51:40,40.76207,-73.968262,912A2B86F30CDFE246586972A892367E,F3241FAB90B4B14FC46C3F11CC14B79E,1,2013-11-26 11:36:54,40.779324,-73.977455,1,N,1.7,886,CMT
1,a0dbc88f34c35a620c3a33af7d447bb2,1-09c485081ed511298abe1d5a0a976e67,2013-02-11 20:31:18,40.795536,-73.966873,4CDB4439568A22F50E68E6C767583F0E,A5A8269908F5D906140559A300992053,1,2013-02-11 20:14:06,40.739632,-74.00267,1,N,5.3,1031,CMT
2,22d54bc53694ffa796879114d35dde53,1-239114ce02a0b43667c2f5db2bb5d34f,2013-11-26 08:59:34,40.755272,-73.972351,C5ADEC336825DEB30222ED03016EC2EA,AD1848EF6C8D8D832D8E9C8A83D58E32,1,2013-11-26 08:41:52,40.770805,-73.950882,1,N,2.1,1061,CMT
3,57cf267a1fe6533edd94a5883b904a60,1-0c2111ef3fbd25eb1775ce3fc460de29,2013-11-26 12:37:56,40.7341,-73.988892,107A492A8269674DF2174B2A33D751C5,87D6A5AF77EA7F5F31213AADB50B7508,1,2013-11-26 12:24:24,40.703072,-74.011734,1,N,4.4,811,CMT
4,952ae0acb1d3a1dcbe4dbdebbabd81b5,1-cef51bf1e73f95a3426e974cf6c750e2,2013-02-11 14:32:20,40.772598,-73.982445,711FF480F454257CDB3DD2E67A080687,271217702A1E3484D03FE5B2B3E49146,1,2013-02-11 14:17:00,40.797695,-73.971397,1,N,1.9,919,CMT
