# Basics

Import libraries and functions.

In [1]:
import pandas as pd
import numpy as np
import glob
import os
from pyspark.sql.functions import concat, col, lit, split

<p>Using the <code>glob</code> library allows us to create a list with only the csv that are ended in <em>(Normalized).csv</em> , which will be the most useful for a statistical analysis. <br>
Moreover, for a future simplification, we also create a list with the relevant information of each of the csv <em>('Area','Year','Element','Unit','Value')</em> </p>

In [2]:
file_list = glob.glob(os.getcwd()+ "/Data/"+"/**(Normalized).csv")
file_list

['C:\\Users\\msanchis\\Documents\\GitHub\\python-data-driven-decisions/Data\\Value_of_Production_E_All_Data_(Normalized).csv']

Intitiate the spark session

In [3]:
from pyspark.sql import SparkSession
  
ss = SparkSession \
    .builder \
    .appName("Our First Spark example") \
    .getOrCreate()

# Standarization of data

In the following cells, we will compare all the csv to see which are not up to our standards of provided data (columns should be as followed: *Area , Year, Element , Item , Unit* and *Value*). Firstly we will collect the number inside the `file_list` of the files which are not up to our standars into a list called missing, which later on will be useful. 

In [151]:
Area=[]
Year=[]
Element=[]
Item=[]
Unit=[]
Value=[]
for i in range (0,len(file_list)):
    file1=ss.read.csv(file_list[i], header=True)
    if 'Area' in file1.schema.fieldNames():
        pass
    else:
        Area.append(i)
        continue
    if 'Year' in file1.schema.fieldNames():
        pass
    else:
        Year.append(i)
    if 'Element' in file1.schema.fieldNames():
        pass
    else:
        Element.append(i)
    if 'Item' in file1.schema.fieldNames():
        pass
    else:
        Item.append(i)
    if 'Unit' in file1.schema.fieldNames():
        pass
    else:
        Unit.append(i)
    if 'Value' in file1.schema.fieldNames():
        pass
    else:
        Value.append(i)
missing=Area + Year+ Element + Item + Unit +Value

With the obtained list, we create a loop to visualy inspect each file which does not met the standars so we can observe how we can adapt them.

In [153]:
for i in missing:
    print(i)
    fileS=ss.read.csv(file_list[i], sep=',',header= True).show(5, truncate=False)

5
+----------+----------+----------------------+-----------------+---------+----------+------------+---------+------------+----------------+---------+----+--------+---------+----+----+
|Donor Code|Donor     |Recipient Country Code|Recipient Country|Item Code|Item      |Element Code|Element  |Purpose Code|Purpose         |Year Code|Year|Unit    |Value    |Flag|Note|
+----------+----------+----------------------+-----------------+---------+----------+------------+---------+------------+----------------+---------+----+--------+---------+----+----+
|702       |All Donors|2                     |Afghanistan      |22040    |Commitment|6110        |Value US$|10000       |All CRS purposes|1973     |1973|millions|41.627000|Fc  |null|
|702       |All Donors|2                     |Afghanistan      |22040    |Commitment|6110        |Value US$|10000       |All CRS purposes|1974     |1974|millions|40.150527|Fc  |null|
|702       |All Donors|2                     |Afghanistan      |22040    |Commitmen

After analyzing the results, we can reason the following:
- For file number **3**, the missing value is *Element*, and as we cannot substitute, it will be replaced by a 0, thanks to the `lit` clause from the `pyspark` library.
- In file **34**, we have the same problems as number 3 for *Element* and the units appear in the column called *Currency* therefore a substitution will be needed.
- For files **20,21** and **22** the columns to be substitiuted are *Indicator* and *Sex* by *Item* and *Element*.
- Whereas for **32** we have a similar case as for number 3 where *Item* cannot be substitude and will be replaced by 0.
- In file 5 and **37**, the missing values is *Area*, however it can be substituted by *Recipient Country*, which will be made later on. 
- For files **40** and **64**, the *Area* is missing, however, we can concat the columns *Reporter Countries* and *Partner Countries*.
- In file **41** 
- And finally for file *67*, we will adapt the *Census year* into *Year*.

In the following cell, we create some list of the files that have things in common. 

In [127]:
B=[5,37]
C=[20,21,22]
F=[40,64]

And finally, we create the Raw_Normalized data base by applying all the steps mentioned before through the `pyspark` library.

In [4]:
file0 = ss.read.csv(file_list[0], sep=',',header= True)
file0=file0.select('Area','Year', (concat(col('Item'),lit(' - '), col('Element'),lit(' - '),col('Unit'))).alias('Unique'),'Value')
for i in range (1,len(file_list)):
    file1=ss.read.csv(file_list[i], header=True)
    if i in missing:
        if i == 3:
          file1=file1.withColumn('Element',lit(0))          
        elif i in B:
          file1=file1.withColumn('Area',col('Recipient Country'))          
        elif i in C:
          file1=file1.withColumn('Element',col('Sex'))
          file1=file1.withColumn('Item',col('Indicator'))          
        elif i==32:
          file1=file1.withColumn('Item',lit(0))          
        elif i==34:
          file1=file1.withColumn('Element',lit(0))
          file1=file1.drop('Unit')
          file1=file1.withColumn('Unit',col('Currency'))          
        elif i in F:
          file1=file1.withColumn( 'Area', concat(col("Reporter Countries"), lit(" - "), col("Partner Countries")))          
        elif i==41:
          file1=file1.withColumn( 'Element', concat(col("Breadown by Sex of the Household Head"), lit(" - "), col("Measure")))
          file1=file1.withColumn('Item',col('Indicator'))
          file1=file1.withColumn("Area", split(col("Survey"), " - ").getItem(0)).withColumn("Year", split(col("Survey"), " - ").getItem(1))         
        elif i==67:
          file1=file1.withColumn('Year',col('Census Year'))          
    else:
      pass
    file1=file1.select('Area','Year', (concat(col('Item'),lit(' - '), col('Element'),lit(' - '),col('Unit'))).alias('Unique'),'Value')
    file0=file0.union(file1)
Raw_Normalized_FAODATA=file0

# Store data to Parquet Table

In [9]:
os.environ

environ{'ALLUSERSPROFILE': 'C:\\ProgramData',
        'APPDATA': 'C:\\Users\\msanchis\\AppData\\Roaming',
        'COMMONPROGRAMFILES': 'C:\\Program Files\\Common Files',
        'COMMONPROGRAMFILES(X86)': 'C:\\Program Files (x86)\\Common Files',
        'COMMONPROGRAMW6432': 'C:\\Program Files\\Common Files',
        'COMPUTERNAME': 'LES009743',
        'COMSPEC': 'C:\\WINDOWS\\system32\\cmd.exe',
        'DRIVERDATA': 'C:\\Windows\\System32\\Drivers\\DriverData',
        'FPS_BROWSER_APP_PROFILE_STRING': 'Internet Explorer',
        'FPS_BROWSER_USER_PROFILE_STRING': 'Default',
        'GPPT_CACHEFOLDER': 'C:\\Windows\\ccmcache',
        'GPPT_INSTALLFOLDER': 'c:\\Installers',
        'GPPT_LOGFOLDER': 'C:\\SCCM_logs',
        'HADOOP_HOME': 'C:\\Program Files\\hadoop-3.2.0',
        'HOMEDRIVE': 'C:',
        'HOMEPATH': '\\Users\\msanchis',
        'JAVA_HOME': 'C:\\Program Files\\Eclipse Foundation\\jdk-11.0.12.7-hotspot',
        'LOCALAPPDATA': 'C:\\Users\\msanchis\\AppData\\Loc

In [10]:
file0 = ss.read.csv(file_list[0], sep=',',header= True).select("Area", "Year", "Item", "Value")
file0.show()
file0.printSchema()
file0.write.parquet("DataRaw.parquet")

+-----------+----+-----------------+------------+
|       Area|Year|             Item|       Value|
+-----------+----+-----------------+------------+
|Afghanistan|1976|Almonds, in shell|49550.000000|
|Afghanistan|1977|Almonds, in shell|45505.000000|
|Afghanistan|1978|Almonds, in shell|60673.000000|
|Afghanistan|1979|Almonds, in shell|53089.000000|
|Afghanistan|1980|Almonds, in shell|50055.000000|
|Afghanistan|1981|Almonds, in shell|40449.000000|
|Afghanistan|1982|Almonds, in shell|55617.000000|
|Afghanistan|1983|Almonds, in shell|49044.000000|
|Afghanistan|1984|Almonds, in shell|53089.000000|
|Afghanistan|1985|Almonds, in shell|45505.000000|
|Afghanistan|1986|Almonds, in shell|50561.000000|
|Afghanistan|1987|Almonds, in shell|45505.000000|
|Afghanistan|1988|Almonds, in shell|45505.000000|
|Afghanistan|1989|Almonds, in shell|44494.000000|
|Afghanistan|1990|Almonds, in shell|48033.000000|
|Afghanistan|1991|Almonds, in shell|45505.000000|
|Afghanistan|1992|Almonds, in shell|50055.000000|


Py4JJavaError: An error occurred while calling o120.parquet.
: org.apache.spark.SparkException: Job aborted.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.jobAbortedError(QueryExecutionErrors.scala:496)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:251)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:186)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:110)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:110)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:106)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:481)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:82)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:481)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:457)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:106)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:93)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:91)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:128)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:848)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:382)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:355)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
	at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:781)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.UnsatisfiedLinkError: 'boolean org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(java.lang.String, int)'
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access0(Native Method)
	at org.apache.hadoop.io.nativeio.NativeIO$Windows.access(NativeIO.java:793)
	at org.apache.hadoop.fs.FileUtil.canRead(FileUtil.java:1215)
	at org.apache.hadoop.fs.FileUtil.list(FileUtil.java:1420)
	at org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:601)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
	at org.apache.hadoop.fs.ChecksumFileSystem.listStatus(ChecksumFileSystem.java:761)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1972)
	at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:2014)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.getAllCommittedTaskPaths(FileOutputCommitter.java:334)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJobInternal(FileOutputCommitter.java:404)
	at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:377)
	at org.apache.parquet.hadoop.ParquetOutputCommitter.commitJob(ParquetOutputCommitter.java:48)
	at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:182)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$20(FileFormatWriter.scala:240)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.util.Utils$.timeTakenMs(Utils.scala:605)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:240)
	... 42 more


# Data integration through pandas

As at the beginning we do not have enough computer power to process the whole database, we are going to develop a test run to be sure that the ideas are escalable.


In this first test run for the loop, we are going to load and process the data into concated data frames, where later on, there is an application of the <code>pivot_table</code> function which adjusts all the variables, previouly called 'Elements' & 'Units' into the headers of the columns, and the values the values of the table.

In [None]:
main_dataframe = pd.DataFrame(pd.read_csv(file_list[0], sep=',', encoding='latin-1'),columns=CC)
for i in range(1,5):
    df = pd.DataFrame(pd.read_csv(file_list[i],sep=',' , encoding='latin-1',low_memory=False), columns=CC)
    main_dataframe = pd.concat([main_dataframe, df])

main_dataframeC=main_dataframe.pivot_table(index=['Area','Year'], columns= ['Element' or 'Item','Unit'], values='Value')
main_dataframeC

<p>In the following cell, we are concatanating all the files from the <code>file_list</code>, which will have the same shape thanks to creation of the dataframes with the restriction of the columns. <br>
Moreover, this concat function will allow for a single data frame which has all the files one on top of another. Therefore the final result form this loop will be <code>main_dataframe</code> which will be our Normalized Source Data Model.</p>

In [None]:
main_dataframe = pd.DataFrame(pd.read_csv(file_list[0], sep=',', encoding='latin-1'),columns=CC)
for i in range(1,len(file_list)):
    df = pd.DataFrame(pd.read_csv(file_list[i],sep=',' , encoding='latin-1',low_memory=False), columns=CC)
    main_dataframe = pd.concat([main_dataframe, df])

<p>Lastly, to convert the Normalized Source Data Model into the Normalized Integrated Data Model, we are going to use the <code>pivot_table</code> function which allows to <br>
adjusts all the variables, previouly called <em>'Elements' & 'Units'</em> into the headers of the columns, and the <em>'Value'</em> column will be the values of the table.

In [None]:
main_dataframeC=main_dataframe.pivot_table(index=['Area','Year'], columns= ['Element' or 'Item' ,'Unit'], values='Value')
main_dataframeC

# Quality assurance

In the following cell, we are going to make sure that none of our interesting variables from <em>'Elements'</em> have been left out, thus checking if the extraction & integtration has been completed.

In [12]:
if len(main_dataframe["Element" or 'Item'].value_counts())==main_dataframeC.shape[1]:
    print('Data extraction & integration is COMPLETED and CORRECT')
else:
    print('Data extraction & integration is UNCOMPLETED')

Data extraction & integration is COMPLETED and CORRECT
