In [1]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
import warnings

warnings.filterwarnings("ignore")

# Start the spark context
sc = SparkContext.getOrCreate(conf=swan_spark_conf) 
print("Created Spark Context")

# create a spark session (which will run spark jobs)
spark = SparkSession.builder.getOrCreate()
print("Created Spark Session")

#  set good looking dataframes
spark.conf.set('spark.sql.repl.eagerEval.enabled', True)
print("Set Good Looking Dataframes")

# use Apache Arrow
spark.conf.set('spark.sql.execution.arrow.pyspark.enabled', True)
print("Using Apache Arrow")

21/08/15 13:11:41 WARN Utils: Your hostname, KAGE-LAPTOP resolves to a loopback address: 127.0.1.1; using 172.19.188.199 instead (on interface eth0)
21/08/15 13:11:41 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
21/08/15 13:11:43 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
INFO:SparkMonitorKernel:Client Connected ('127.0.0.1', 36096)


Created Spark Context
Created Spark Session
Set Good Looking Dataframes
Using Apache Arrow


In [2]:
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.sql import DataFrame as D
from shutil import rmtree
from os import path

def createSchema(sdf):
    # code taken from the Spark Tutorial from MAST30034 with minor changes to accomodate data post 2015
    ints = ('VendorID', 'passenger_count', 'RatecodeID','payment_type', 'PULocationID', 'DOLocationID')
    doubles = ('trip_distance', 'fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 
               'improvement_surcharge', 'total_amount')
    strings = ('store_and_fwd_flag',)
    dates = ('tpep_pickup_datetime', 'tpep_dropoff_datetime', )
    
    # Pick out just the date from the pickup and dropoff datetime before we convert the two columns to Date Types
    sdf = sdf.withColumn('tpep_pickup_datetime', F.regexp_extract('tpep_pickup_datetime', '(\d{4}-\d{2}-\d{2}).+', 1)) \
        .withColumn('tpep_dropoff_datetime', F.regexp_extract('tpep_dropoff_datetime', '(\d{4}-\d{2}-\d{2}).+', 1))

    dtypes = {column: IntegerType() for column in ints}
    dtypes.update({column: DoubleType() for column in doubles})
    dtypes.update({column: StringType() for column in strings})
    dtypes.update({column: DateType() for column in dates})
    
    schema = StructType()

    for column in sdf.columns:
        schema.add(column, # column name
                   dtypes[column], # data type
                   True # is nullable?
                  )
    
    return schema

def readDF(setSchema):
    # Read a Spark DataFrame with the schema we have created for the data. Also rename a couple of columns
    sdf = spark.read.csv('./raw_data/taxi_trips', header=True, schema=setSchema) \
    .withColumnRenamed("RatecodeID","RateCodeID") \
    .withColumnRenamed('tpep_pickup_datetime', 'PUDate') \
    .withColumnRenamed('tpep_dropoff_datetime', 'DODate')
    
    return sdf
    
    
sdf = spark.read.csv('./raw_data/taxi_trips', header=True)    
sdf = readDF(createSchema(sdf))

sdf = sdf.drop("store_and_fwd_flag")


print((sdf.count(), len(sdf.columns)))



(24412713, 16)


                                                                                

In [None]:
fpath = './raw_data/2018JunToAug.parquet'
if path.exists(fpath):
    rmtree(fpath)
sdf.write.format('parquet').save(fpath)