In [1]:
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession

from data_pipeline_functions import get_load_params_from_json, load_to_spark, write_to_cassandra

In [2]:
import os
import sys

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

In [3]:
sql_queries_mappings = "table_models.json"

In [4]:
# get cassandra table and sql query mappings
table_mappings = get_load_params_from_json(file_path=sql_queries_mappings)
print(table_mappings)

{'customer': {'table_name': 'customer', 'query': '(SELECT * FROM customer ) AS customer'}}


## Spark Session

In [5]:
import socket

sock_name = socket.gethostname()
host = socket.gethostbyname(sock_name)
print(host)

172.29.112.1


In [6]:
conf = pyspark.SparkConf()\
    .setAll([
        ('spark.executor.memory', '2g'), 
        ('spark.executor.cores', '2'), 
        ('spark.driver.host', host),
        ("spark.shuffle.service.enabled", "false"),
        ("spark.dynamicAllocation.enabled", "false"),
        ('spark.cores.max', '2'), 
        ('spark.driver.memory','4g'),
    ])
        

In [7]:
# Create a Spark session
spark = SparkSession.builder \
    .appName("DataMigration") \
    .config("spark.jars.packages", 
                        "org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.1,com.datastax.spark:spark-cassandra-connector_2.12:3.0.1,org.postgresql:postgresql:42.2.18")\
    .config(conf=conf) \
    .getOrCreate()

In [8]:
PSQL_SERVERNAME="127.0.0.1"
PSQL_PORT=5432
PSQL_DBNAME="postgres"
PSQL_USERNAME="postgres"
PSQL_PASSWORD="mypassword"
KEYSPACE = "archive"

In [9]:
for cassandra_table, table_details in table_mappings.items():
    postgres_table = table_details['table_name']
    query = table_details['query']
    
    df = load_to_spark(
    spark=spark,
    sql_query=query,
    psql_server=PSQL_SERVERNAME,
    psql_dbname=PSQL_DBNAME,
    psql_port=PSQL_PORT,
    psql_username=PSQL_USERNAME,
    psql_password=PSQL_PASSWORD
    )
    
    write_to_cassandra(
        df=df,
        cassandra_table=cassandra_table,
        postgres_table=postgres_table,
        keyspace=KEYSPACE
    )

INFO:data_pipeline_functions:Successfully loaded data from postgres into spark.
INFO:data_pipeline_functions:Data written successfully from customer into keyspace archive.
