In [None]:
#!pip install ctgan pyspark
#!pip install psycopg2-binary psycopg2 SQLAlchemy

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.functions import col
from pyspark.sql.functions import from_utc_timestamp

import psycopg2
import pandas as pd

#from pyspark.sql import SQLContext
#from pyspark.conf import SparkConf
#from sqlalchemy import create_engine

In [2]:
#spark = SparkSession.builder.appName("Python Spark SQL basic example").getOrCreate()

spark = SparkSession.builder.appName("Python Spark 100M example")\
    .config("spark.driver.host", "localhost") \
    .config("spark.executor.memory", "4g") \
    .config("spark.executor.cores", "4") \
    .config("spark.cores.max", "4") \
    .config("spark.driver.memory","4g") \
    .config("spark.executor.extraClassPath", "postgresql-42.7.4.jar") \
    .getOrCreate()

spark

In [3]:
## Download this file from kaggel it is around 1.4GB non compressed 100 Million rows
## url: https://www.kaggle.com/datasets/rohanrao/riiid-train-data-multiple-formats?select=riiid_train.parquet 

path = 'C:/Users/piyus/Downloads/riiid_train/riiid_train.parquet'

df_spark = spark.read.option("header","true").option("recursiveFileLookup","true").parquet(path)

df_spark.head(5)

[Row(row_id=0, timestamp=0, user_id=115, content_id=5692, content_type_id=False, task_container_id=1, user_answer=3, answered_correctly=1, prior_question_elapsed_time=None, prior_question_had_explanation=None),
 Row(row_id=1, timestamp=56943, user_id=115, content_id=5716, content_type_id=False, task_container_id=2, user_answer=2, answered_correctly=1, prior_question_elapsed_time=37000.0, prior_question_had_explanation=False),
 Row(row_id=2, timestamp=118363, user_id=115, content_id=128, content_type_id=False, task_container_id=0, user_answer=0, answered_correctly=1, prior_question_elapsed_time=55000.0, prior_question_had_explanation=False),
 Row(row_id=3, timestamp=131167, user_id=115, content_id=7860, content_type_id=False, task_container_id=3, user_answer=0, answered_correctly=1, prior_question_elapsed_time=19000.0, prior_question_had_explanation=False),
 Row(row_id=4, timestamp=137965, user_id=115, content_id=7922, content_type_id=False, task_container_id=4, user_answer=1, answered_

In [None]:
## Both Works   #print(df_spark.schema)
df_spark.printSchema()

In [None]:
df_spark.count()

In [None]:
## This will only show unique id's 
#df_spark.select('user_id').distinct().show()

## This will show id and count
#Use this only on small DF
#df_spark.groupby('user_id').count().show(n=df_spark.count(), truncate = False)

#df_spark.groupby('user_id').count().show(50)


In [4]:
## Changing timestamp from second in long format to timestamp
# Adding new Column with transformation
converted_df = df_spark.withColumn("timestamp_column", F.from_unixtime(F.col("timestamp")))

converted_df.show()

+------+---------+-------+----------+---------------+-----------------+-----------+------------------+---------------------------+------------------------------+-------------------+
|row_id|timestamp|user_id|content_id|content_type_id|task_container_id|user_answer|answered_correctly|prior_question_elapsed_time|prior_question_had_explanation|   timestamp_column|
+------+---------+-------+----------+---------------+-----------------+-----------+------------------+---------------------------+------------------------------+-------------------+
|     0|        0|    115|      5692|          false|                1|          3|                 1|                       NULL|                          NULL|1970-01-01 05:30:00|
|     1|    56943|    115|      5716|          false|                2|          2|                 1|                    37000.0|                         false|1970-01-01 21:19:03|
|     2|   118363|    115|       128|          false|                0|          0|       

In [None]:
## Changing timestamp from second in long format to timestamp Another way to do it
#df1 = df_spark.withColumn('end_time', from_utc_timestamp(F.from_unixtime(F.col("timestamp")), 'IST'))
#df1.show()

In [8]:
# remove old column
converted_df = converted_df.drop('timestamp')
# rename new column
converted_df = converted_df.withColumnRenamed('timestamp_column','timestamp') 

converted_df.show()

+------+-------+----------+---------------+-----------------+-----------+------------------+---------------------------+------------------------------+-------------------+
|row_id|user_id|content_id|content_type_id|task_container_id|user_answer|answered_correctly|prior_question_elapsed_time|prior_question_had_explanation|          timestamp|
+------+-------+----------+---------------+-----------------+-----------+------------------+---------------------------+------------------------------+-------------------+
|     0|    115|      5692|          false|                1|          3|                 1|                       NULL|                          NULL|1970-01-01 05:30:00|
|     1|    115|      5716|          false|                2|          2|                 1|                    37000.0|                         false|1970-01-01 21:19:03|
|     2|    115|       128|          false|                0|          0|                 1|                    55000.0|                    

In [9]:
## Create Connection to PostgreSQL.
try:
    conn = psycopg2.connect(dbname="postgres", user='postgres', password='password', port='5432')
    print("Connected to DB !!")
except:
    print("Connection was unsuccessful !!")  

Connected to DB !!


In [11]:
converted_df.printSchema()

root
 |-- row_id: long (nullable = true)
 |-- user_id: integer (nullable = true)
 |-- content_id: short (nullable = true)
 |-- content_type_id: boolean (nullable = true)
 |-- task_container_id: short (nullable = true)
 |-- user_answer: byte (nullable = true)
 |-- answered_correctly: byte (nullable = true)
 |-- prior_question_elapsed_time: float (nullable = true)
 |-- prior_question_had_explanation: boolean (nullable = true)
 |-- timestamp: string (nullable = true)



In [12]:
cur = conn.cursor()

In [13]:
cur.execute("""
CREATE TABLE IF NOT EXISTS testdb.consent (
        row_id BIGINT primary key,
        timestamp TIMESTAMP,
        user_id varchar(20),
        content_id INT,  
		content_type_id boolean,
		task_container_id INT,
		user_answer INT,
		answered_correctly INT,
		prior_question_elapsed_time BIGINT,
		prior_question_had_explanation boolean)
""")

In [14]:
conn.commit()

In [15]:
## You have to keep the jar in right location then only spark will identify it 
## For windows put in spark folder ==> C:\Program Files\spark-3.5.3-bin-hadoop3\jars
## Databricks have diff location

url_connect = 'jdbc:postgresql://localhost:5432/postgres?currentSchema=testdb'
properties = {"user": "postgres","password": "password","driver": "org.postgresql.Driver"}

start = pd.Timestamp.now()

converted_df.write.jdbc(url=url_connect, table="consent", mode="overwrite", properties=properties)

print(pd.Timestamp.now()-start)

0 days 00:12:23.341035
