In [1]:
import numpy as np
import time
import datetime

import pyspark

from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *


### function to transform date

#date1 = "31/12/2015"
#newdate1 = time.strptime(date1, "%d/%m/%Y")


# The date has to be with the form DD/MM/YYYY and our data scheme is DD/MM/YY
# this function adds 20 to put the date to the correct form. We can do that because all of our dates are from the same century

def transform_date(string, index):
    return string[:index] + '20' + string[index:]

### This is the structure to access the date data

#print(newdate1.tm_year)
#print(newdate1.tm_mon)
#print(newdate1.tm_mday)

#print(transform_date(date1, 6))


# Set up the spark session

spark = SparkSession.builder \
   .master("spark://spark-master:7077") \
   .appName("romain2") \
   .config(conf = pyspark.SparkConf()) \
   .getOrCreate()



# Read the csv File

df1 = spark.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('../Bureau/SOD/TBF110_DECLARATION_DATA_TABLE.csv')


# Delete the frist line. I did it beacuse there were two date missing from the file. And it is juste a proof of concept
# In production date field is filled

dftemp1 = df1.dropDuplicates()
dfReadyToTransform = dftemp1.dropDuplicates()



# We transform date from our dataset

rdd = dfReadyToTransform.rdd.map(lambda p: transform_date(p.INVOICE_DATE, 6))


# Three columns are created to display year, months and days
# datedf returns a dataframe

datedf = rdd.map(lambda x: (x,time.strptime(x, "%d/%m/%Y").tm_year,time.strptime(x, "%d/%m/%Y").tm_mon,time.strptime(x, "%d/%m/%Y").tm_mday)).toDF()
#datedf.show()


# Rename the df columns

datedf = datedf.withColumnRenamed('_2', 'year')
datedf = datedf.withColumnRenamed('_3', 'month')
datedf = datedf.withColumnRenamed('_4', 'day')
#datedf.show()

# Read the csv file and transforms it to dataframe

df2 = spark.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('../Bureau/SOD/TBF134_CUSTOMS_FILE_GROUP_DATA_TABLE.csv')


# Create an id to the two dataframe to add easily datedf columns. It is a tips to add column to a dataframe

datedf = datedf.withColumn("rowId", monotonically_increasing_id())
dfReady = dfReadyToTransform.withColumn("rowId", monotonically_increasing_id())
#datedf.show()


# Select all data we are interested to study from the two dataframes

finaldf = dfReady.join(datedf, 'rowId', 'inner').select('SENDER_SITE_LOC_ID','RECEIVER_SITE_LOC_ID','ITEMS_TOTAL_AMOUNT','INVOICED_ENTITY_LOC_ID','INVOICING_ENTITY_LOC_ID','INVOICE_NUMBER','GROUPING_ID','CUSTOMS_REGIME_CODE', 'year', 'month', 'day','LAT_SENDER','LON_SENDER','LAT_RECEIVER','LON_RECEIVER')
finaldf.show()


finaldf.toPandas().to_csv('romain2.csv')


#finaldf.write.csv('romain.csv')


spark.stop()

+------------------+--------------------+------------------+----------------------+-----------------------+--------------+-----------+-------------------+----+-----+---+----------+----------+------------+------------+
|SENDER_SITE_LOC_ID|RECEIVER_SITE_LOC_ID|ITEMS_TOTAL_AMOUNT|INVOICED_ENTITY_LOC_ID|INVOICING_ENTITY_LOC_ID|INVOICE_NUMBER|GROUPING_ID|CUSTOMS_REGIME_CODE|year|month|day|LAT_SENDER|LON_SENDER|LAT_RECEIVER|LON_RECEIVER|
+------------------+--------------------+------------------+----------------------+-----------------------+--------------+-----------+-------------------+----+-----+---+----------+----------+------------+------------+
|             15793|               16021|      200541749408|                 25823|                  16222|       8355417|     137861|               1000|2017|    5|  2|     44.34|     26.03|       48.83|        2.35|
|             15793|               16021|       28521692304|                 25823|                  16222|       8380266|     1