# Project Title
### Data Engineering Capstone Project

#### Project Summary
--describe your project at a high level--

The project follows the follow steps:
* Step 1: Scope the Project and Gather Data
* Step 2: Explore and Assess the Data
* Step 3: Define the Data Model
* Step 4: Run ETL to Model the Data
* Step 5: Complete Project Write Up


In [1]:
## NOTES
# Install a pip package in the current Jupyter kernel

#import sys
#!{sys.executable} -m pip install s3fs
#!{sys.executable} -m pip install boto
#!{sys.executable} -m pip install boto3
#!{sys.executable} -m pip install pyspark

In [2]:
# IMPORTS AND INSTALLS

import pandas as pd
import configparser
from datetime import datetime
import os

import boto
import botocore
import boto3
from boto.s3.key import Key
#  https://stackoverflow.com/questions/30249069/listing-contents-of-a-bucket-with-boto3

from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col

In [3]:
# SETUP FOR USING S3 BUCKETS WHERE THE DATAFILES ARE

config = configparser.ConfigParser()
config.read('dl.cfg')

os.environ['AWS_ACCESS_KEY_ID']=config['KEYS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY']=config['KEYS']['AWS_SECRET_ACCESS_KEY']

S3_URI = "s3a://raul-udacity/"
#s3a vs s3 explanation https://stackoverflow.com/questions/33356041/technically-what-is-the-difference-between-s3n-s3a-and-s3

data_bares = 'bares.csv'
data_restaurantes = 'restaurantes.csv'
data_cafeterias = 'cafeterias.csv'

data_asociaciones = 'AsociacionesJCyL.csv'
data_clubes_deportivos = 'Clubes deportivos.csv'

data_bibliotecas = 'Directorio de Bibliotecas de Castilla y León.json'
data_museos = 'Directorio de Museos de Castilla y León.json'

data_poblacion = 'Cities population per gender age.csv'

# Other available data we decided not to use
# Poblacion municipio sexo relacion nacimiento residencia.json
# Municipios Origen Nacimiento.csv
# 

# Step 1: Scope the Project and Gather Data

## Scope 
Explain what you plan to do in the project in more detail. What data do you use? What is your end solution look like? What tools did you use? etc>
Scope.md file

## Describe and Gather Data 
Describe the data sets you're using. Where did it come from? What type of information is included? 
https://github.com/rantoncuadrado/udacity_capstone_project/blob/main/Datasources%20Description.md
Datasources Description.md file

### COPY FILES FROM s3 TO LOCAL

In [4]:
# COPY FILES FROM s3 TO LOCAL. LIST FILENAMES (KEYS)
# We need to download the files and then read with spark_session.read.csv or read.json from the local directory
# because of this error https://github.com/boto/boto3/issues/2566


bucket_name = 'raul-udacity'
local_path = './input_files/'

# connect to the bucket
conn = boto.connect_s3(os.environ['AWS_ACCESS_KEY_ID'],os.environ['AWS_SECRET_ACCESS_KEY'])

# Explanation client and resource
s3_resource = boto3.resource('s3')
s3 = boto3.client('s3')
resp=s3.list_objects_v2(Bucket=bucket_name )

# List file names
files = []
for obj in resp['Contents']:
        files.append(obj['Key'])

print(files)

# This would clone a file in a s3 bucket
# s3_resource.Object('raul-udacity','bares2.csv').copy_from(CopySource='raul-udacity/bares.csv')
# And this would read it df=spark.read.json("s3a://udacity-dend/log_json_path.json")

for file in files:
    try:
        s3_resource.Bucket(bucket_name).download_file(file, local_path+file)
    except botocore.exceptions.ClientError as e:
        if e.response['Error']['Code'] == "404":
            print("The object does not exist.")
        else:
            raise

ClientError: An error occurred (AuthorizationHeaderMalformed) when calling the ListObjectsV2 operation: The authorization header is malformed; the Credential is mal-formed; expecting "<YOUR-AKID>/YYYYMMDD/REGION/SERVICE/aws4_request".

# Step 2: Explore and Assess the Data
## Explore the Data 
Identify data quality issues, like missing values, duplicate data, etc.

## Cleaning Steps
Once we have the files in local filesystem, I'll use dataframes to clean the data and later SPARK to manipulate them.

In [12]:
# Create an SPARK SESSION

spark_session = SparkSession \
        .builder \
        .config("spark.jars.packages","org.apache.hadoop:hadoop-aws:2.7.0") \
        .config("spark.hadoop.fs.s3a.access.key",os.environ['AWS_ACCESS_KEY_ID']) \
        .config("spark.hadoop.fs.s3a.secret.key",os.environ['AWS_SECRET_ACCESS_KEY']) \
        .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
        .config("spark.jars.packages","saurfang:spark-sas7bdat:2.0.0-s_2.11") \
        .getOrCreate()


### CLEANING BAR, RESTAURANT, CAFE
These 3 files share same schema

In [34]:
Sparkdf_bar         = spark_session.read.options(inferSchema='true',\
                                delimiter=';',\
                                header='true',\
                                encoding='ISO-8859-1')\
                                .csv('./input_files/bares.csv')

Sparkdf_restaurante = spark_session.read.options(inferSchema='true',\
                                delimiter=';',\
                                header='true',\
                                encoding='ISO-8859-1')\
                                .csv('./input_files/restaurantes.csv')

Sparkdf_cafeteria   = spark_session.read.options(inferSchema='true',\
                                delimiter=';',\
                                header='true',\
                                encoding='ISO-8859-1')\
                                .csv('./input_files/cafeterias.csv')

print(Sparkdf_bar.describe())
print(Sparkdf_restaurante.describe())
print(Sparkdf_cafeteria.describe())   

DataFrame[summary: string, N.Registro: string, Especialidades: string, Nombre: string, Dirección: string, C.Postal: string, Provincia: string, Municipio: string, Localidad: string, Nucleo: string, Teléfono 1: string, Teléfono 2: string, Teléfono 3: string, Email: string, web: string, Q Calidad: string, Plazas: string, GPS.Longitud: string, GPS.Latitud: string, accesible a personas con discapacidad: string, _c19: string]
DataFrame[summary: string, N.Registro: string, Tipo: string, Categoría: string, Especialidades: string, Nombre: string, Dirección: string, C.Postal: string, Provincia: string, Municipio: string, Localidad: string, Nucleo: string, Teléfono 1: string, Teléfono 2: string, Teléfono 3: string, Email: string, web: string, Q Calidad: string, Plazas: string, GPS.Longitud: string, GPS.Latitud: string, accesible a personas con discapacidad: string, _c21: string]
DataFrame[summary: string, N.Registro: string, Tipo: string, Categoría: string, Nombre: string, Dirección: string, C.Po

In [48]:
## We just need some columns
Sparkdf_bar=Sparkdf_bar.select(
            col('Nombre').alias('name'),
            col('Dirección').alias('address'),
            col('Provincia').alias('county'),
            col('Municipio').alias('city'),
            col('`C.Postal`').alias('postal_code')
        ).distinct()

Sparkdf_restaurante=Sparkdf_restaurante.select(
            col('Nombre').alias('name'),
            col('Dirección').alias('address'),
            col('Provincia').alias('county'),
            col('Municipio').alias('city'),
            col('`C.Postal`').alias('postal_code')
        ).distinct()

Sparkdf_cafeteria=Sparkdf_cafeteria.select(
            col('Nombre').alias('name'),
            col('Dirección').alias('address'),
            col('Provincia').alias('county'),
            col('Municipio').alias('city'),
            col('`C.Postal`').alias('postal_code')
        ).distinct()

 

In [50]:
Sparkdf_garitos = (
        Sparkdf_bar.union(Sparkdf_restaurante).union(Sparkdf_cafeteria)
    )

Sparkdf_garitos.describe()

DataFrame[summary: string, name: string, address: string, county: string, city: string, postal_code: string]

In [51]:
df=Sparkdf_garitos.toPandas()
df.describe(include='all')

Unnamed: 0,name,address,county,city,postal_code
count,22487,22426,22487,22487,22472
unique,16100,21087,9,1730,2025
top,LA PLAZA,"PLAZA MAYOR, 2",León,Valladolid,24003
freq,87,20,4942,2668,369


In [59]:
## we need a correspondence city - postal code so checking empty postal_code cases

Sparkdf_garitos=Sparkdf_garitos.select(
            'name',
            'address',
            'county',
            'city',
            'postal_code'
            ).where(col('postal_code').isNull())

Sparkdf_garitos.head(5)

Py4JJavaError: An error occurred while calling o679.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 85.0 failed 1 times, most recent failure: Lost task 0.0 in stage 85.0 (TID 1698) (192.168.1.35 executor driver): java.io.FileNotFoundException: File file:/Users/raulantoncuadrado/git/udacity_capstone_project/input_files/restaurantes.csv does not exist
It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:124)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:169)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:93)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.agg_doAggregateWithKeys_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:132)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2253)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2202)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2201)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2201)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1078)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1078)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1078)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2440)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2382)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2371)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2202)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2223)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2242)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:472)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:425)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47)
	at org.apache.spark.sql.Dataset.$anonfun$collectToPython$1(Dataset.scala:3519)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3687)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:772)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3685)
	at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:3516)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.FileNotFoundException: File file:/Users/raulantoncuadrado/git/udacity_capstone_project/input_files/restaurantes.csv does not exist
It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:124)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:169)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:93)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.agg_doAggregateWithKeys_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:132)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


# Step 3: Define the Data Model
## 3.1 Conceptual Data Model
Map out the conceptual data model and explain why you chose that model

## 3.2 Mapping Out Data Pipelines
List the steps necessary to pipeline the data into the chosen data model

# Step 4: Run Pipelines to Model the Data 
## 4.1 Create the data model
Build the data pipelines to create the data model.


## 4.2 Data Quality Checks
Explain the data quality checks you'll perform to ensure the pipeline ran as expected. These could include:
 * Integrity constraints on the relational database (e.g., unique key, data type, etc.)
 * Unit tests for the scripts to ensure they are doing the right thing
 * Source/Count checks to ensure completeness
 
Run Quality Checks

## 4.3 Data dictionary 
Create a data dictionary for your data model. For each field, provide a brief description of what the data is and where it came from. You can include the data dictionary in the notebook or in a separate file.

## Step 5: Complete Project Write Up
* Clearly state the rationale for the choice of tools and technologies for the project.
* Propose how often the data should be updated and why.
* Write a description of how you would approach the problem differently under the following scenarios:
 * The data was increased by 100x.
 * The data populates a dashboard that must be updated on a daily basis by 7am every day.
 * The database needed to be accessed by 100+ people.