# AWS Glue Studio Notebook
##### You are now running a AWS Glue Studio notebook; To start using your notebook you need to start an AWS Glue Interactive Session.


####  Run this cell to set up and start your interactive session.


In [1]:
%idle_timeout 2880
%glue_version 3.0
%worker_type G.1X
%number_of_workers 5

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 0.37.0 
Current idle_timeout is 2880 minutes.
idle_timeout has been set to 2880 minutes.
Setting Glue version to: 3.0
Previous worker type: G.1X
Setting new worker type to: G.1X
Previous number of workers: 5
Setting new number of workers to: 5
Authenticating with environment variables and user-defined glue_role_arn: arn:aws:iam::114652167878:role/AWSGlueAndS3RoleGrupo2
Trying to create a Glue session for the kernel.
Worker Type: G.1X
Number of Workers: 5
Session ID: f4205a84-0dec-45b5-a697-32989117c4b6
Job Type: glueetl
Applying the following default arguments:
--glue_kernel_version 0.37.0
--enable-glue-datacatalog true
Waiting for session f4205a84

#### Loading DataFrames with schema

In [6]:
from pyspark.sql.types import StructType,StructField, StringType, IntegerType

users_schema = StructType() \
      .add("#id",StringType(),True) \
      .add("country",StringType(),True) \
      .add("registered",StringType(),True) \
      .add("date",IntegerType(),True)
      
horoscope_schema = StructType() \
      .add("Horoscope",StringType(),True) \
      .add("TimeStart",StringType(),True) \
      .add("TimeEnd",StringType(),True) \
      .add("TimeStartInt",IntegerType(),True) \
      .add("TimeEndInt",IntegerType(),True)


usersSource = "s3://pfinal-p2-grupo2/curated/userid-profile.csv"
horoscopeSource = "s3://pfinal-p2-grupo2/curated/horoscopeCured.csv"
countriesSource = "s3://pfinal-p2-grupo2/curated/countryContinentCured.csv"

df_users = spark.read.option("header", "true").schema(users_schema).csv(usersSource)
df_horoscope = spark.read.option("header", "true").schema(horoscope_schema).csv(horoscopeSource)
df_countries = spark.read.option("header", "true").csv(countriesSource)

root
 |-- #id: string (nullable = true)
 |-- country: string (nullable = true)
 |-- registered: string (nullable = true)
 |-- date: integer (nullable = true)

root
 |-- Horoscope: string (nullable = true)
 |-- TimeStart: string (nullable = true)
 |-- TimeEnd: string (nullable = true)
 |-- TimeStartInt: integer (nullable = true)
 |-- TimeEndInt: integer (nullable = true)

root
 |-- country: string (nullable = true)
 |-- continent: string (nullable = true)


#### Users DataFrame join with Countries DataFrame by country


In [15]:
df_user_countries = df_users.join(df_countries, df_users.country == df_countries.country, 'left').drop(df_users.country)

907
908


#### Users_Countries DataFrame join with Horoscope DataFrame by date range


In [12]:
from pyspark.sql import functions as F
df_hor_users_country = df_user_countries.join(df_horoscope, F.col("date").between(F.col('TimeStartInt'), F.col('TimeEndInt')), 'left').drop("TimeStart", "TimeEnd", "TimeStartInt", "TimeEndInt", "date")

+-----------+----------+------------------+---------+---------+
|        #id|registered|           country|continent|Horoscope|
+-----------+----------+------------------+---------+---------+
|user_000533|2009-11-05|             Japan|     Asia|    Libra|
|user_000497|2008-02-16|     United States| Americas|Capricorn|
|user_000872|2008-01-31|            Brazil| Americas|Capricorn|
|user_000651|2007-12-17|     United States| Americas|  Scorpio|
|user_000650|2007-12-16|     United States| Americas|  Scorpio|
|user_000435|2007-10-18|            Poland|   Europe|    Virgo|
|user_000775|2007-10-09|           Finland|   Europe|    Virgo|
|user_000956|2007-09-25|            Poland|   Europe|    Virgo|
|user_000579|2007-09-24|    United Kingdom|   Europe|    Virgo|
|user_000500|2007-09-23|     United States| Americas|    Virgo|
|user_000218|2007-09-19|       Switzerland|   Europe|      Leo|
|user_000786|2007-09-17|Russian Federation|   Europe|      Leo|
|user_000246|2007-09-02|    United Kingd

#### Write DataFrame as parquet file partitioned by country


In [16]:
df_hor_users_country.write.partitionBy("country").parquet("s3://pfinal-p2-grupo2/refined/TablaMaestra/tablamaestra.parquet")

df_hor_users_country.write.csv("s3://pfinal-p2-grupo2/csv/TablaMaestra")


