##### PySpark Utility for ETL 

<i><b>Configuration:</b>

1. SOURCE DEFINITION FILE (JSON)
2. Transform SQL File

</i>

###### Source Definiton File

In [128]:
%%writefile ../configs/config.json

{
    "OUTPUT_DIRECTORY" : "/user/hadoop/pyetl/output",
    "SOURCE" : [
        {
            "TYPE" : "FILE",
            "FORMAT" : "CSV",
            "LOCATION" : "file:\\\\\\C:\\Users\\Kishan\\Desktop\\python_projects\\csv_to_json\\input\\employee.csv",
            "NAME" : "TEMP_TABLE_1",
            "OPTIONS" : {
                "SEP" : ",",
                "HEADER" : "true"
            }
        },
        {
            "TYPE" : "TABLE",
            "TABLE" : "db_name.table_name",
            "NAME" : "TEMP_TABLE_2"
        },
        {
            "TYPE" : "QUERY",
            "QUERY" : "SELECT Name, Department FROM TEMP_TABLE_1",
            "NAME" : "TEMP_TABLE_3"
        },
        {
            "TYPE" : "QUERY",
            "QUERY" : "SELECT * FROM TEMP_TABLE_1",
            "NAME" : "TEMP_TABLE_4"
        }
    ],
    "SQL" : [
        {
            "QUERY" : "SELECT * FROM TEMP_TABLE_1",
            "NAME" : "QUERY_1"
        }
    ]
}


Overwriting ../configs/config.json


In [129]:
# Method to Parse JSON Config File
from json import JSONDecodeError
import json

def read_config_file(file_path):
    config = {}
    try:
        with open(file_path, 'r') as conf:
            try:
                config = json.load(conf)
            except JSONDecodeError as e:
                logging.error("JSON Exception: {}".format(e))
                logging.error("EXITING")
                sys.exit(1)
    except FileNotFoundError as f:
        logging.error("File Not Found Exception: {}".format(f))
        logging.error("EXITING")
        sys.exit(1)
    return config


In [130]:
# Define Logging

import logging

log_frmt = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
logging.basicConfig(level=logging.INFO, format=log_frmt)

In [133]:
config_file_location = "../configs/config.json"

configs = read_config_file(config_file_location)

# GET ALL SOURCE
sources = configs['SOURCE']
queries = configs['SQL']

In [135]:
print("TEMP TABLE CONFIGS")
for source in sources:
    print(source)

print("SQL TO EXECUTE")
for sql in queries:
    print(sql)

TEMP TABLE CONFIGS
{'TYPE': 'FILE', 'FORMAT': 'CSV', 'LOCATION': 'file:\\\\\\C:\\Users\\Kishan\\Desktop\\python_projects\\csv_to_json\\input\\employee.csv', 'NAME': 'TEMP_TABLE_1', 'OPTIONS': {'SEP': ',', 'HEADER': 'true'}}
{'TYPE': 'TABLE', 'TABLE': 'db_name.table_name', 'NAME': 'TEMP_TABLE_2'}
{'TYPE': 'QUERY', 'QUERY': 'SELECT Name, Department FROM TEMP_TABLE_1', 'NAME': 'TEMP_TABLE_3'}
{'TYPE': 'QUERY', 'QUERY': 'SELECT * FROM TEMP_TABLE_1', 'NAME': 'TEMP_TABLE_4'}
SQL TO EXECUTE
{'QUERY': 'SELECT * FROM TEMP_TABLE_1', 'NAME': 'QUERY_1'}


In [70]:
# Start Spark Session
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("PYSPARK-ETL-USER-DATETIME-APPNAME").master("local").getOrCreate()

In [116]:
import sys

def throw_error(error):
    raise Exception(error)

def get_value(source_definition, definition_key):
    if definition_key in source_definition:
        return source_definition[definition_key]
    else:
        return None

def create_temp_table_from_definition(source_definition):
    temp_table_name = get_value(source_definition, "NAME").upper()
    source_type = get_value(source_definition, "TYPE").upper()
    logging.info("CREATING SPARK SQL TEMP TABLE - {}".format(temp_table_name))
    logging.info("SOURCE TYPE - {}".format(source_type))
    if source_type == "FILE":
        file_format = get_value(source_definition, "FORMAT").upper()
        file_location = get_value(source_definition, "LOCATION")
        file_options = get_value(source_definition, "OPTIONS")
        if file_format is None or file_location is None:
            throw_error("For Source Type 'FILE'. FORMAT and LOCATION parameter are mandatory")
        logging.info("FILE FORMAT - {}".format(file_format))
        logging.info("FILE LOCATION - {}".format(file_location))
        if file_format == "CSV":
            header_exists = get_value(file_options, "HEADER")
            separator = get_value(file_options, "SEP")
            if header_exists is None or separator is None:
                throw_error("For File Format 'CSV'. HEADER and SEP parameter are mandatory")
            logging.info("FILE OPTIONS - {}".format(file_options))
            df = spark.read.options(header = header_exists, delimited = separator).format(file_format).load(file_location)
            df.createOrReplaceTempView(temp_table_name)
        else:
            df = spark.read.format(file_format).load(file_location)
            df.createOrReplaceTempView(temp_table_name)
    elif source_type == "QUERY":
        query = get_value(source_definition, "QUERY")
        if query is None:
            throw_error("For Source Type 'QUERY'. QUERY parameter is mandatory")
        logging.info("QUERY - {}".format(query))
        df = spark.sql(query)
        df.createOrReplaceTempView(temp_table_name)
    elif source_type == "TABLE":
        table = get_value(source_definition, "TABLE")
        if table is None:
            throw_error("For Source Type 'TABLE'. TABLE parameter is mandatory")
        logging.info("TABLE - {}".format(table))
        df = spark.table(table)
        df.createOrReplaceTempView(temp_table_name)
    logging.info("TEMP TABLE - {} Created Successfully".format(temp_table_name))
    
for source in sources:
    if source['TYPE'] != 'TABLE':
        try:
            create_temp_table_from_definition(source)
        except Exception as e:
            logging.error("Caught Exception while creating SPARK SQL Temporary table: {}".format(e))
            sys.exit(1)
    

2020-12-12 15:53:32,027 - root - INFO - CREATING SPARK SQL TEMP TABLE - TEMP_TABLE_1
2020-12-12 15:53:32,028 - root - INFO - SOURCE TYPE - FILE
2020-12-12 15:53:32,029 - root - INFO - FILE FORMAT - CSV
2020-12-12 15:53:32,030 - root - INFO - FILE LOCATION - file:\\\C:\Users\Kishan\Desktop\python_projects\csv_to_json\input\employee.csv
2020-12-12 15:53:32,032 - root - INFO - FILE OPTIONS - {'SEP': ',', 'HEADER': 'true'}
2020-12-12 15:53:32,238 - root - INFO - TEMP TABLE - TEMP_TABLE_1 Created Successfully
2020-12-12 15:53:32,239 - root - INFO - CREATING SPARK SQL TEMP TABLE - TEMP_TABLE_3
2020-12-12 15:53:32,241 - root - INFO - SOURCE TYPE - QUERY
2020-12-12 15:53:32,244 - root - INFO - QUERY - SELECT Name, Department FROM TEMP_TABLE_1
2020-12-12 15:53:32,264 - root - INFO - TEMP TABLE - TEMP_TABLE_3 Created Successfully
2020-12-12 15:53:32,265 - root - INFO - CREATING SPARK SQL TEMP TABLE - TEMP_TABLE_4
2020-12-12 15:53:32,267 - root - INFO - SOURCE TYPE - QUERY
2020-12-12 15:53:32,270

In [117]:
spark.sql("SELECT * FROM TEMP_TABLE_1").show()

+-------------+----------+-----------+------+
|         Name|Department|    Manager|Salary|
+-------------+----------+-----------+------+
|   Robin Hood|      null|       null|   200|
|Arsene Wenger|       Bar| Friar Tuck|    50|
|   Friar Tuck|       Foo| Robin Hood|   100|
|  Little John|       Foo| Robin Hood|   100|
|Sam Allardyce|      null|       null|   250|
|Dimi Berbatov|       Foo|Little John|    50|
+-------------+----------+-----------+------+



In [118]:
spark.sql("SELECT * FROM TEMP_TABLE_3").show()

+-------------+----------+
|         Name|Department|
+-------------+----------+
|   Robin Hood|      null|
|Arsene Wenger|       Bar|
|   Friar Tuck|       Foo|
|  Little John|       Foo|
|Sam Allardyce|      null|
|Dimi Berbatov|       Foo|
+-------------+----------+



In [120]:
spark.sql("SELECT COUNT(1) AS COUNT FROM TEMP_TABLE_4").show()

+-----+
|COUNT|
+-----+
|    6|
+-----+



In [104]:
spark.sql("SELECT * FROM TEMP_TABLE_4").write.mode("overwrite").csv(r'file://////C://Users//Kishan//Desktop//python_projects//pyspark_etl//output//test1')

Py4JJavaError: An error occurred while calling o1571.csv.
: org.apache.spark.SparkException: Job aborted.
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:198)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:83)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:81)
	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
	at org.apache.spark.sql.DataFrameWriter.csv(DataFrameWriter.scala:664)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 35.0 failed 1 times, most recent failure: Lost task 0.0 in stage 35.0 (TID 35, localhost, executor driver): java.io.IOException: (null) entry in command string: null chmod 0644 C:\Users\Kishan\Desktop\python_projects\pyspark_etl\output\test1\_temporary\0\_temporary\attempt_20201212152318_0035_m_000000_35\part-00000-05ec3a0e-1067-44d2-8139-5b249543a14a-c000.csv
	at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:770)
	at org.apache.hadoop.util.Shell.execCommand(Shell.java:866)
	at org.apache.hadoop.util.Shell.execCommand(Shell.java:849)
	at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:733)
	at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:225)
	at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:209)
	at org.apache.hadoop.fs.RawLocalFileSystem.createOutputStreamWithMode(RawLocalFileSystem.java:307)
	at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:296)
	at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:328)
	at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSOutputSummer.<init>(ChecksumFileSystem.java:398)
	at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:461)
	at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:440)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:911)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:892)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:789)
	at org.apache.spark.sql.execution.datasources.CodecStreams$.createOutputStream(CodecStreams.scala:81)
	at org.apache.spark.sql.execution.datasources.CodecStreams$.createOutputStreamWriter(CodecStreams.scala:92)
	at org.apache.spark.sql.execution.datasources.csv.CsvOutputWriter.<init>(CSVFileFormat.scala:177)
	at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anon$1.newInstance(CSVFileFormat.scala:85)
	at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:120)
	at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.<init>(FileFormatDataWriter.scala:108)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:236)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1891)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1879)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1878)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:927)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2112)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2061)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2050)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:738)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:167)
	... 33 more
Caused by: java.io.IOException: (null) entry in command string: null chmod 0644 C:\Users\Kishan\Desktop\python_projects\pyspark_etl\output\test1\_temporary\0\_temporary\attempt_20201212152318_0035_m_000000_35\part-00000-05ec3a0e-1067-44d2-8139-5b249543a14a-c000.csv
	at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:770)
	at org.apache.hadoop.util.Shell.execCommand(Shell.java:866)
	at org.apache.hadoop.util.Shell.execCommand(Shell.java:849)
	at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:733)
	at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:225)
	at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:209)
	at org.apache.hadoop.fs.RawLocalFileSystem.createOutputStreamWithMode(RawLocalFileSystem.java:307)
	at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:296)
	at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:328)
	at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSOutputSummer.<init>(ChecksumFileSystem.java:398)
	at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:461)
	at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:440)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:911)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:892)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:789)
	at org.apache.spark.sql.execution.datasources.CodecStreams$.createOutputStream(CodecStreams.scala:81)
	at org.apache.spark.sql.execution.datasources.CodecStreams$.createOutputStreamWriter(CodecStreams.scala:92)
	at org.apache.spark.sql.execution.datasources.csv.CsvOutputWriter.<init>(CSVFileFormat.scala:177)
	at org.apache.spark.sql.execution.datasources.csv.CSVFileFormat$$anon$1.newInstance(CSVFileFormat.scala:85)
	at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:120)
	at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.<init>(FileFormatDataWriter.scala:108)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:236)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
