In [1]:
# Constants
import os
DB_USER = os.getenv('DB_USER','postgres')
DB_PASSWORD = os.getenv('DB_PASSWORD','postgres')
CONNECTION_STRING = "postgresql://{0}:{1}@localhost/pagila".format(DB_USER,DB_PASSWORD)

# Read all tables from information schema

In [2]:
import pandas as pd
query = """SELECT table_name
  FROM information_schema.tables
 WHERE table_schema='public'
   AND table_type='BASE TABLE';
"""
df_information_schema = pd.read_sql_query(query,con=CONNECTION_STRING)
df_information_schema
# Staff table has binary data i.e. b'acv23243'. Directly trying to convert that to StringType() results in "can't pickle memoryview objects"
# We have to decode that properly, and encode it back to string. For this project, we remove the staff table
df_information_schema = df_information_schema[df_information_schema['table_name']!='staff']
df_information_schema

Unnamed: 0,table_name
0,category
1,country
2,film_category
3,customer
4,actor
5,inventory
6,language
7,rental
8,film_actor
9,address


# Custom function for map between pandas datatypes & apache spark data types

In [3]:
# https://spark.apache.org/docs/latest/sql-reference.html
from pyspark.sql.types import *
# pandas data types to Spark data types

def pandas_datatype_to_spark_datatype(input_datatype="object"):
    if input_datatype == "int64":
        return LongType()
    elif input_datatype == "float64":
        return DoubleType()
    elif input_datatype == "datetime64":
        return TimestampType()
    elif input_datatype == "string":
        return StringType()
    elif input_datatype == "object":
        return StringType()
    elif input_datatype == "bool":
        return BooleanType()
    else:
        return StringType()
    

# Initialize Spark

In [5]:
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
config = SparkConf()
config.set("spark.driver.memory", "2g")
config.set("spark.executor.memory", "1g")
spark = SparkSession.builder.config(conf=config).master("local").appName("postgres to spark").getOrCreate()

In [8]:
spark

# Customer Table conversion

In [None]:
%%time
import pandas as pd
from pyspark.sql.types import *

query = "select * from customer"
df_customers = pd.read_sql_query(query,con=CONNECTION_STRING)
spark_schema = StructType([])
for col_name in df_customers.columns.tolist():
    spark_schema.add(col_name, pandas_datatype_to_spark_datatype(str(df_customers.dtypes[col_name])), nullable=True, metadata=None)
spark_schema
spark_df = spark.createDataFrame(df_customers, schema=spark_schema)
spark_df.createOrReplaceTempView("customer")

In [None]:
spark.sql("select count(*) from customer").take(1)[0].asDict()["count(1)"]

# Convert all tables to spark - Temp tables (metastore)
- Generate query
- Create pandas dataframe from sql query
- Retrieve pandas columns datatypes and map each data type to corresponding Spark datatype with customer function
- Build spark schema
- Convert pandas dataframe to Spark dataframe , passing in the above spark schema built
- Create Spark temp table

In [None]:
%%time
import pandas as pd

# spark.conf.set("spark.sql.execution.arrow.enabled",  "false")
for table in df_information_schema['table_name'].tolist():
    query = "select * from {0}".format(table)
    df_pandas = pd.read_sql_query(query,con=CONNECTION_STRING)
    spark_schema = StructType([])
    for col_name in df_pandas.columns.tolist():
        spark_schema.add(col_name, pandas_datatype_to_spark_datatype(str(df_pandas.dtypes[col_name])), nullable=True, metadata=None)
    spark_df = spark.createDataFrame(df_pandas, schema=spark_schema)
    spark_df.createOrReplaceTempView(table)

# Verify data

In [None]:
%%time
for table in df_information_schema['table_name'].tolist():
    count = spark.sql("select count(*) from {0}".format(table)).take(1)[0].asDict()["count(1)"]
    print("{0} : {1} ".format(table,count))

In [None]:
spark.stop()

# Convert all tables to spark - Persistent tables
- Generate query
- Create pandas dataframe from sql query
- Retrieve pandas columns datatypes and map each data type to corresponding Spark datatype with customer function
- Build spark schema
- Convert pandas dataframe to Spark dataframe , passing in the above spark schema built
- Create Spark table

In [6]:
%%time
import pandas as pd

# spark.conf.set("spark.sql.execution.arrow.enabled",  "false")
for table in df_information_schema['table_name'].tolist():
    query = "select * from {0}".format(table)
    df_pandas = pd.read_sql_query(query,con=CONNECTION_STRING)
    spark_schema = StructType([])
    for col_name in df_pandas.columns.tolist():
        spark_schema.add(col_name, pandas_datatype_to_spark_datatype(str(df_pandas.dtypes[col_name])), nullable=True, metadata=None)
    spark_df = spark.createDataFrame(df_pandas, schema=spark_schema)
    spark_df.write.option("path", "./{0}".format(table)).saveAsTable(table)

CPU times: user 3.71 s, sys: 180 ms, total: 3.89 s
Wall time: 14.2 s


# Verify Data

In [7]:
%%time
for table in df_information_schema['table_name'].tolist():
    count = spark.sql("select count(*) from {0}".format(table)).take(1)[0].asDict()["count(1)"]
    print("{0} : {1} ".format(table,count))

category : 100016 
country : 109 
film_category : 1000 
customer : 599 
actor : 200 
inventory : 4581 
language : 6 
rental : 16044 
film_actor : 5462 
address : 603 
city : 600 
film : 1000 
store : 1002 
CPU times: user 49.1 ms, sys: 13.8 ms, total: 62.8 ms
Wall time: 3.07 s
