In [50]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DateType, DecimalType, IntegerType,TimestampType
# Build SparkSession
import getpass
spark = SparkSession.builder.master("local[*]").appName("ETL Pipeline")\
        .config("spark.jars", "../jars/redshift-jdbc42-2.1.0.9.jar").getOrCreate()

# Set Logging Level to WARN
spark.sparkContext.setLogLevel("WARN")

In [51]:
source_data_directory = "./financial_data/"
economic_indicators=["economic_indicatorCPI",
                     "economic_indicatorCONSUMER_SENTIMENT",
                     "economic_indicatorDURABLES",
                     "economic_indicatorINFLATION_EXPECTATION",
                     "economic_indicatorNONFARM_PAYROLL",
                     "economic_indicatorRETAIL_SALES",
                     "economic_indicatorTREASURY_YIELD",
                     "economic_indicatorUNEMPLOYMENT"]
# Define csv input schema
schema = StructType([
    StructField("timestamp", DateType()),
    StructField("value", DecimalType(precision=38, scale=2)),
])


In [52]:
dictionary_df={}
for i in economic_indicators:
    dictionary_df[i] = spark.read.option("header", "true").csv(source_data_directory+i+".csv", schema=schema)

In [53]:
for i in economic_indicators:
    dictionary_df[i].createOrReplaceTempView(i)
    

In [54]:
dataDF=spark.sql("select economic_indicatorCPI.timestamp as date, economic_indicatorCPI.value as CPI_value, economic_indicatorCONSUMER_SENTIMENT.value as CONSUMER_SENTIMENT_value,"+
                 "economic_indicatorDURABLES.value as DURABLES_value,economic_indicatorINFLATION_EXPECTATION.value as INFLATION_EXPECTATION_value,"+
                 "economic_indicatorNONFARM_PAYROLL.value as NONFARM_PAYROLL_value,economic_indicatorRETAIL_SALES.value as RETAIL_SALES_value,"+
                 "economic_indicatorTREASURY_YIELD.value as TREASURY_YIELD_value,economic_indicatorUNEMPLOYMENT.value as UNEMPLOYMENT_value "
          "from economic_indicatorCPI LEFT OUTER JOIN economic_indicatorCONSUMER_SENTIMENT ON economic_indicatorCPI.timestamp == economic_indicatorCONSUMER_SENTIMENT.timestamp "
          "LEFT OUTER JOIN economic_indicatorDURABLES ON economic_indicatorCPI.timestamp == economic_indicatorDURABLES.timestamp "+
          "LEFT OUTER JOIN economic_indicatorINFLATION_EXPECTATION ON economic_indicatorCPI.timestamp == economic_indicatorINFLATION_EXPECTATION.timestamp "+
          "LEFT OUTER JOIN economic_indicatorTREASURY_YIELD ON economic_indicatorCPI.timestamp == economic_indicatorTREASURY_YIELD.timestamp "+
          "LEFT OUTER JOIN economic_indicatorNONFARM_PAYROLL ON economic_indicatorCPI.timestamp == economic_indicatorNONFARM_PAYROLL.timestamp "+
          "LEFT OUTER JOIN economic_indicatorRETAIL_SALES ON economic_indicatorCPI.timestamp == economic_indicatorRETAIL_SALES.timestamp "+
          "LEFT OUTER JOIN economic_indicatorUNEMPLOYMENT ON economic_indicatorCPI.timestamp == economic_indicatorUNEMPLOYMENT.timestamp ")

In [55]:
dataDF.count()

1318

In [56]:
dataDF.show()

+----------+---------+------------------------+--------------+---------------------------+---------------------+------------------+--------------------+------------------+
|      date|CPI_value|CONSUMER_SENTIMENT_value|DURABLES_value|INFLATION_EXPECTATION_value|NONFARM_PAYROLL_value|RETAIL_SALES_value|TREASURY_YIELD_value|UNEMPLOYMENT_value|
+----------+---------+------------------------+--------------+---------------------------+---------------------+------------------+--------------------+------------------+
|2022-10-01|   298.01|                    null|          null|                       null|            154369.00|         597492.00|                3.98|              3.70|
|2022-09-01|   296.81|                   58.60|     290199.00|                       4.70|            153197.00|         576853.00|                3.52|              3.50|
|2022-08-01|   296.17|                   58.20|     279323.00|                       4.80|            152674.00|         613416.00|         

In [57]:
from pyspark.sql.functions import udf
def extract_month(date):
    if date is not None:
        return int(date.month)


def extract_year(date):
    if date is not None:
        return int(date.year)


def extract_day(date):
    if date is not None:
        return int(date.day)
udf_month = udf(extract_month, IntegerType())
udf_year = udf(extract_year, IntegerType())
udf_day = udf(extract_day, IntegerType())

In [58]:
day_month_year_DF = dataDF \
    .withColumn("month", udf_month("date")) \
    .withColumn("year", udf_year("date")) \
    .withColumn("day", udf_day("date"))
day_month_year_DF.show()

+----------+---------+------------------------+--------------+---------------------------+---------------------+------------------+--------------------+------------------+-----+----+---+
|      date|CPI_value|CONSUMER_SENTIMENT_value|DURABLES_value|INFLATION_EXPECTATION_value|NONFARM_PAYROLL_value|RETAIL_SALES_value|TREASURY_YIELD_value|UNEMPLOYMENT_value|month|year|day|
+----------+---------+------------------------+--------------+---------------------------+---------------------+------------------+--------------------+------------------+-----+----+---+
|2022-10-01|   298.01|                    null|          null|                       null|            154369.00|         597492.00|                3.98|              3.70|   10|2022|  1|
|2022-09-01|   296.81|                   58.60|     290199.00|                       4.70|            153197.00|         576853.00|                3.52|              3.50|    9|2022|  1|
|2022-08-01|   296.17|                   58.20|     279323.00|   

In [59]:
finalDF=day_month_year_DF.drop("date")


In [60]:
finalDF.printSchema()


root
 |-- CPI_value: decimal(38,2) (nullable = true)
 |-- CONSUMER_SENTIMENT_value: decimal(38,2) (nullable = true)
 |-- DURABLES_value: decimal(38,2) (nullable = true)
 |-- INFLATION_EXPECTATION_value: decimal(38,2) (nullable = true)
 |-- NONFARM_PAYROLL_value: decimal(38,2) (nullable = true)
 |-- RETAIL_SALES_value: decimal(38,2) (nullable = true)
 |-- TREASURY_YIELD_value: decimal(38,2) (nullable = true)
 |-- UNEMPLOYMENT_value: decimal(38,2) (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- day: integer (nullable = true)



In [61]:
driver = "com.amazon.redshift.jdbc42.Driver"
url='jdbc:redshift://redshift-cluster-1.cfaj06ovlgm3.us-east-1.redshift.amazonaws.com:5439/dev'	  
dbtable='public.financial'
user='awsuser'
password=getpass.getpass()

········


In [64]:
finalDF.write.format('jdbc').options(
      url=url,	  
      driver=driver,
      dbtable=dbtable,
      user=user,
      password=password).mode('append').save() 

Py4JJavaError: An error occurred while calling o731.save.
: java.sql.SQLException: The connection attempt failed.
	at com.amazon.redshift.util.RedshiftException.getSQLException(RedshiftException.java:56)
	at com.amazon.redshift.Driver.connect(Driver.java:339)
	at org.apache.spark.sql.execution.datasources.jdbc.connection.BasicConnectionProvider.getConnection(BasicConnectionProvider.scala:49)
	at org.apache.spark.sql.execution.datasources.jdbc.connection.ConnectionProviderBase.create(ConnectionProvider.scala:102)
	at org.apache.spark.sql.jdbc.JdbcDialect.$anonfun$createConnectionFactory$1(JdbcDialects.scala:122)
	at org.apache.spark.sql.jdbc.JdbcDialect.$anonfun$createConnectionFactory$1$adapted(JdbcDialects.scala:118)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:50)
	at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:47)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:584)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:176)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:584)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:560)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:116)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:860)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:390)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:363)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:247)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.net.SocketTimeoutException: Connect timed out
	at java.base/sun.nio.ch.NioSocketImpl.timedFinishConnect(NioSocketImpl.java:546)
	at java.base/sun.nio.ch.NioSocketImpl.connect(NioSocketImpl.java:597)
	at java.base/java.net.SocksSocketImpl.connect(SocksSocketImpl.java:327)
	at java.base/java.net.Socket.connect(Socket.java:633)
	at com.amazon.redshift.core.RedshiftStream.<init>(RedshiftStream.java:86)
	at com.amazon.redshift.core.v3.ConnectionFactoryImpl.tryConnect(ConnectionFactoryImpl.java:111)
	at com.amazon.redshift.core.v3.ConnectionFactoryImpl.openConnectionImpl(ConnectionFactoryImpl.java:224)
	at com.amazon.redshift.core.ConnectionFactory.openConnection(ConnectionFactory.java:51)
	at com.amazon.redshift.jdbc.RedshiftConnectionImpl.<init>(RedshiftConnectionImpl.java:322)
	at com.amazon.redshift.Driver.makeConnection(Driver.java:502)
	at com.amazon.redshift.Driver.connect(Driver.java:315)
	... 46 more
