In [13]:
import pyspark
from pyspark.sql import SparkSession, types
import pyspark.sql.functions as sf
import pandas as pd
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

In [2]:
df = spark.read \
    .option("header", "true") \
    .csv('work/fhvhv_tripdata_2021-01.csv.gz')

df.show(1)

+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|hvfhs_license_num|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|
+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|           HV0003|              B02682|2021-01-01 00:33:44|2021-01-01 00:49:07|         230|         166|   NULL|
+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
only showing top 1 row



In [3]:
df.schema

StructType([StructField('hvfhs_license_num', StringType(), True), StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', StringType(), True), StructField('dropoff_datetime', StringType(), True), StructField('PULocationID', StringType(), True), StructField('DOLocationID', StringType(), True), StructField('SR_Flag', StringType(), True)])

In [4]:
df_pandas = pd.read_csv("work/head.csv")
df_pandas.dtypes

hvfhs_license_num        object
dispatching_base_num     object
pickup_datetime          object
dropoff_datetime         object
PULocationID              int64
DOLocationID              int64
SR_Flag                 float64
dtype: object

In [6]:
schema = types.StructType([
    types.StructField('hvfhs_license_num', types.StringType(), True),
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropoff_datetime', types.TimestampType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.StringType(), True)
])

In [7]:
df = spark.read.option("header", "true")\
          .schema(schema) \
            .csv('work/fhvhv_tripdata_2021-01.csv.gz')

In [9]:
# current partition number
df.rdd.getNumPartitions()


1

In [10]:
df = df.repartition(24)

In [11]:
df.rdd.getNumPartitions()

24

In [12]:
df.printSchema()

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)



##### UDF

In [15]:
def crazy_stuff(base_num):
    num = int(base_num[1:])
    if num % 7 == 0:
        return f's/{num:03x}'
    elif num % 3 == 0:
        return f'a/{num:03x}'
    else:
        return f'e/{num:03x}'

In [16]:
# register functions as UDF
crazy_stuff_udf = sf.udf(crazy_stuff, types.StringType())

In [18]:
df \
    .withColumn('pickup_date', sf.to_date(df.pickup_datetime)) \
    .withColumn('dropoff_date', sf.to_date(df.dropoff_datetime)) \
    .withColumn('base_id', crazy_stuff_udf(df.dispatching_base_num)) \
    .select('base_id', 'pickup_date', 'dropoff_date', 'PULocationID', 'DOLocationID') \
    .show()

+-------+-----------+------------+------------+------------+
|base_id|pickup_date|dropoff_date|PULocationID|DOLocationID|
+-------+-----------+------------+------------+------------+
|  e/9ce| 2021-01-09|  2021-01-09|          85|          35|
|  e/acc| 2021-01-07|  2021-01-07|          37|          36|
|  e/b38| 2021-01-22|  2021-01-22|         158|         229|
|  e/9ce| 2021-01-20|  2021-01-20|         216|         130|
|  e/9ce| 2021-01-29|  2021-01-29|         160|          48|
|  e/b3b| 2021-01-15|  2021-01-15|         165|          91|
|  e/9ce| 2021-01-02|  2021-01-02|         262|         239|
|  e/b38| 2021-01-20|  2021-01-20|          85|          67|
|  e/acc| 2021-01-17|  2021-01-17|          48|          47|
|  s/acd| 2021-01-17|  2021-01-17|         244|         159|
|  e/acc| 2021-01-21|  2021-01-21|         244|         242|
|  e/9ce| 2021-01-23|  2021-01-23|         234|          12|
|  e/b3c| 2021-01-22|  2021-01-22|         143|         239|
|  s/b13| 2021-01-23|  2