# AWS Glue Studio Notebook
##### You are now running a AWS Glue Studio notebook; To start using your notebook you need to start an AWS Glue Interactive Session.


#### Optional: Run this cell to see available notebook commands ("magics").


In [None]:
%help

####  Run this cell to set up and start your interactive session.


In [1]:
%idle_timeout 2880
%glue_version 3.0
%worker_type G.1X
%number_of_workers 5

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 0.37.0 
Current idle_timeout is 2800 minutes.
idle_timeout has been set to 2880 minutes.
Setting Glue version to: 3.0
Previous worker type: G.1X
Setting new worker type to: G.1X
Previous number of workers: 5
Setting new number of workers to: 5
Authenticating with environment variables and user-defined glue_role_arn: arn:aws:iam::424875905672:role/glue-cdl-full-access
Trying to create a Glue session for the kernel.
Worker Type: G.1X
Number of Workers: 5
Session ID: d3c0beb3-1cea-43c4-87a8-27aa5078aee6
Job Type: glueetl
Applying the following default arguments:
--glue_kernel_version 0.37.0
--enable-glue-datacatalog true
Waiting for session d3c0beb3-1

## Aux functions

In [2]:
from pyspark.sql.types import *
from pyspark import SQLContext

sqlContext = SQLContext(sc)
# Auxiliar functions
# pd to spark (fast)
def equivalent_type(f):
    if f == 'datetime64[ns]': return TimestampType()
    elif f == 'int64': return LongType()
    elif f == 'int32' or f == 'uint8': return IntegerType()
    elif f == 'float64': return DoubleType()
    elif f == 'float32': return FloatType()
    else: return StringType()

def define_structure(string, format_type):
    try: typo = equivalent_type(format_type)
    except: typo = StringType()
    return StructField(string, typo)

# Given pandas dataframe, it will return a spark's dataframe.
def pandas_to_spark(pandas_df):
    columns = list(pandas_df.columns)
    types = list(pandas_df.dtypes)
    struct_list = []
    for column, typo in zip(columns, types): 
      struct_list.append(define_structure(column, typo))
    p_schema = StructType(struct_list)
    return sqlContext.createDataFrame(pandas_df, p_schema)




## Read

In [3]:
# Read in data as dynamic frame
churn = glueContext.create_dynamic_frame.from_options(
    connection_type="s3",
    connection_options={
        "paths": ["s3://cloud-eng-bucket/"],
        "recurse": True,
        "header": "true"
    },
    format="csv"
)

# Convert to spark df
# All strings
churn_df = churn.toDF()




## Transform

In [4]:
# Fix header
header = churn_df.rdd.first()
churn_final = spark.createDataFrame(churn_df.rdd.filter(lambda x: x != header), header)




In [5]:
# Fix data types
from pyspark.sql.functions import col

cols_to_cast = ['SeniorCitizen', 'tenure', 'MonthlyCharges', 'TotalCharges', 'numAdminTickets', 'numTechTickets']
for col_name in cols_to_cast:
    churn_final = churn_final.withColumn(col_name, col(col_name).cast("double"))




In [7]:
# To pd
import pandas as pd
import numpy as np

churn_pd = churn_final.toPandas()

df_2 = churn_pd.copy()
df_2['TotalCharges'] = df_2['TotalCharges'].replace(' ',np.nan)
df_2 = df_2.dropna(how='any').reset_index(drop=True)

from sklearn.preprocessing import OneHotEncoder
def get_ohe(df,col_name):
    ohe = OneHotEncoder(sparse=False,categories="auto",drop="first")
    ohe.fit(df[col_name])
    temp_df = pd.DataFrame(data=ohe.transform(df[col_name]), columns=ohe.get_feature_names())
    df.drop(columns=col_name, axis=1, inplace=True)
    df = pd.concat([df.reset_index(drop=True), temp_df], axis=1)
    return df,ohe

df_3,ohe_obj = get_ohe(df_2,["gender","Partner", "Dependents","PhoneService","MultipleLines","InternetService","OnlineSecurity",
                            "OnlineBackup","DeviceProtection","TechSupport","StreamingTV","StreamingMovies","Contract","PaperlessBilling",
                            "PaymentMethod","Churn"])
df_4 = df_3.drop(columns = ['customerID'])




## All the way back

In [8]:
from awsglue.dynamicframe import DynamicFrame

churn_clean_spark = pandas_to_spark(df_4)
churn_clean_dyf = DynamicFrame.fromDF(churn_clean_spark, glueContext, 'convert')




## Write

In [12]:
# Housekeeping
import boto3

database_name = "cloud-eng"
table_name = "telecom_churn"
glue_client = boto3.client('glue')

schema = churn_clean_dyf.schema()
columns = [
    {
        "Name": field.name,
        "Type": field.dataType.typeName()
    }
    for field in schema.fields
]

# Create table configurations
create_table_options_streamed = {
    "DatabaseName": database_name,
    "TableInput": {
        "Name": table_name,
        "Description": "Clean data for telecom churn",
        
        "StorageDescriptor": {
            "Columns": columns,
            "Location": "s3://cloud-eng-bucket/",
            "InputFormat": "org.apache.hadoop.mapred.TextInputFormat",
            "OutputFormat": "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat",
            "Compressed": False,
            "SerdeInfo": {
                "SerializationLibrary": "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe",
                "Parameters": {
                    "field.delim": ",",
                    "skip.header.line.count" : "1"
                }
            }
        },
        "PartitionKeys": []
    }
}

# Check if streamed table exists
# If the streamed table does not exist, create

try: 
    response = glue_client.get_table(
    DatabaseName=database_name,
    Name=table_name
)
    print(f"{table_name} already exists. Directly writing...")
except:
    glue_client = boto3.client('glue')
    response_streamed = glue_client.create_table(**create_table_options_streamed)
    print(f"{table_name} does not exist. Creating...")

glueContext.write_dynamic_frame.from_catalog(
    frame = churn_clean_dyf,
    database = database_name,
    table_name = table_name
    
)

print(f"Sucessfully wrote to {table_name}")

telecom_churn does not exist. Creating...
Sucessfully wrote to telecom_churn
