In [None]:
import os
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DateType, FloatType, IntegerType
import pyspark.sql.functions as F

In [None]:
#changing user to hdfs such that it can access files in the hdfs
os.environ["HADOOP_USER_NAME"] = "hdfs"

In [None]:
conf = SparkConf().setAll((
    ("spark.task.maxFailures", "10"),
    ("spark.serializer", "org.apache.spark.serializer.KryoSerializer"),
    ("spark.sql.execution.arrow.enabled", "true"),
    ("spark.shuffle.service.enabled", "true"),
    ("spark.driver.memory", "12g"),
    ("spark.dynamicAllocation.enabled", "true")))

In [None]:
conf.setAppName("csv_conversion").setMaster("yarn-client")

In [None]:
spark = SparkSession.builder \
    .appName("FNMA Spark -  Notebook 1") \
    .config(conf=conf) \
    .getOrCreate()


In [None]:
spark

In [None]:
schema_acq = [['LoanID', str],
              ['Channel', str],
              ['SellerName', str],
              ['OrInterestRate', float],
              ['OrUnpaidPrinc', int],
              ['OrLoanTerm', int],
              ['OrDate', str],
              ['FirstPayment', str],
              ['OrLTV', float],
              ['OrCLTV', float],
              ['NumBorrow', float],
              ['DTIRat', float],
              ['CreditScore', float],
              ['FTHomeBuyer', str],
              ['LoanPurpose', str],
              ['PropertyType', str],
              ['NumUnits', int],
              ['OccStatus', str],
              ['PropertyState', str],
              ['Zip', int],
              ['MortInsPerc', float],
              ['ProductType', str],
              ['CoCreditScore', float],
              ['MortInsType', float],
              ['RelMortInd', str]]
schema_per = [['LoanID', str],
              ['ReportingDate', str],
              ['Servicer', str],
              ['CurrInterestRate', float],
              ['CAUPB', float],
              ['LoanAge', int],
              ['MonthsToMaturity', float],
              ['AdMonthsToMaturity', float],
              ['MaturityDate', str],
              ['MSA', int],
              ['CurDelStatus', str],
              ['ModFlag', str],
              ['ZeroBalCode', float],
              ['ZeroBalEffDate', str],
              ['LastInstallDate', str],
              ['ForeclosureDate', str],
              ['DispositionDate', str],
              ['ForeclosureCost', float],
              ['RepairCost', float],
              ['AssetRecCost', float],
              ['MiscCostsPF', float],
              ['ATFHP', float],
              ['NetSaleProceeds', float],
              ['CreditEnhProceeds', float],
              ['RPMWP', float],
              ['OtherForePro', float],
              ['NonInterestUPB', float],
              ['PricipleForgiven', float],
              ['RMWPF', str],
              ['FPWA', float],
              ['ServicingIndicator', str]]


In [None]:
schemap = {
    str: StringType(),
    float: FloatType(),
    int: IntegerType()
}

In [None]:
#Loading Acquisition and Performance Datasets in spark sql
schema_acq_spark = StructType([StructField(k, schemap[v], True) for k,v in schema_acq])
schema_per_spark = StructType([StructField(k, schemap[v], True) for k,v in schema_per])

acq = spark.read.load("/data/FNMA/Acquisition_2017Q1.txt", format="csv", header="false",
                     sep='|', schema=schema_acq_spark)

per = spark.read.load("/data/FNMA/Performance_2017*.txt", format="csv", header="false",
                     sep='|', schema=schema_per_spark)


# Joining Acquisition and Performance Datasets from CSV in spark sql and saving the results to a csv file.  

In [None]:

per = per.withColumn('date', F.to_date(per.ReportingDate, 'MM/dd/yyyy')) \
    .drop('ReportingDate') \
    .withColumnRenamed('date', 'ReportingDate') \
    .orderBy("LoanID", F.desc("ReportingDate")) \
    .dropDuplicates(["LoanID"])

df = per.join(F.broadcast(acq), 'LoanID', 'outer').persist()

#Converting the join dataframe to csv
df.write.format('com.databricks.spark.csv').mode('overwrite').save('/data/FNMA/FNMA_2017_Join_result.csv')