In [1]:
import pandas as pd
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import types
from pyspark.conf import SparkConf

In [2]:
!wget https://jdbc.postgresql.org/download/postgresql-42.7.1.jar

--2024-01-22 08:51:56--  https://jdbc.postgresql.org/download/postgresql-42.7.1.jar
Resolving jdbc.postgresql.org (jdbc.postgresql.org)... 72.32.157.228, 2001:4800:3e1:1::228
Connecting to jdbc.postgresql.org (jdbc.postgresql.org)|72.32.157.228|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1084174 (1.0M) [application/java-archive]
Saving to: ‘postgresql-42.7.1.jar’


2024-01-22 08:52:01 (883 KB/s) - ‘postgresql-42.7.1.jar’ saved [1084174/1084174]



In [None]:
conf = SparkConf()
conf.set('spark.jars', 'postgresql-42.7.1.jar')

In [None]:
spark = SparkSession.builder \
        .master("local[*]") \
        .appName("test") \
        .config(conf=conf) \
        .config("spark.executor.memory","3g") \
        .config("spark.executor.cores", "4") \
        .config("spark.cores.max","2") \
        .enableHiveSupport() \
        .getOrCreate()

In [None]:
df = spark.read \
     .option('header','true') \
     .csv('data/yellow/')

In [None]:
df.show()

In [None]:
df.printSchema()

In [None]:
df_pandas = pd.read_csv('data/yellow/yellow_tripdata_2019-01.csv.gz', nrows=1000)

In [None]:
df_pandas.head()

In [None]:
spark.createDataFrame(df_pandas).schema

In [None]:
schema = types.StructType([ 
    types.StructField('VendorID', types.IntegerType(), True),
    types.StructField('tpep_pickup_datetime', types.TimestampType(), True), 
    types.StructField('tpep_dropoff_datetime', types.TimestampType(), True), 
    types.StructField('passenger_count', types.IntegerType(), True), 
    types.StructField('trip_distance', types.DoubleType(), True),
    types.StructField('RatecodeID', types.IntegerType(), True), 
    types.StructField('store_and_fwd_flag', types.StringType(), True), 
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True), 
    types.StructField('payment_type', types.IntegerType(), True), 
    types.StructField('fare_amount', types.DoubleType(), True), 
    types.StructField('extra', types.DoubleType(), True), 
    types.StructField('mta_tax', types.DoubleType(), True), 
    types.StructField('tip_amount', types.DoubleType(), True), 
    types.StructField('tolls_amount', types.DoubleType(), True),
    types.StructField('improvement_surcharge', types.DoubleType(), True),
    types.StructField('total_amount', types.DoubleType(), True), 
    types.StructField('congestion_surcharge', types.DoubleType(), True)
     ])

In [None]:
df = spark.read \
     .option('header','true') \
     .schema(schema) \
     .csv('data/yellow/')

In [None]:
df.show()

In [None]:
df.printSchema()

In [None]:
df_limited = df.limit(5000000)

In [None]:
df_limited.write.format("jdbc") \
 .options(url="jdbc:postgresql://localhost:5432/production",
          dbtable="taxi_rides_all.yellow_trips_data",
          user="ahona",
          password="ahona",
          driver="org.postgresql.Driver") \
 .mode('overwrite') \
 .save()