# AWS Glue Studio Notebook
##### You are now running a AWS Glue Studio notebook; To start using your notebook you need to start an AWS Glue Interactive Session.


#### Optional: Run this cell to see available notebook commands ("magics").


In [None]:
%help

####  Run this cell to set up and start your interactive session.


In [1]:
%idle_timeout 2880
%glue_version 3.0
%worker_type G.1X
%number_of_workers 5

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 0.37.0 
Current idle_timeout is 2800 minutes.
idle_timeout has been set to 2880 minutes.
Setting Glue version to: 3.0
Previous worker type: G.1X
Setting new worker type to: G.1X
Previous number of workers: 5
Setting new number of workers to: 5
Authenticating with environment variables and user-defined glue_role_arn: arn:aws:iam::424875905672:role/glue-cdl-full-access
Trying to create a Glue session for the kernel.
Worker Type: G.1X
Number of Workers: 5
Session ID: f9fd1c36-08b8-4157-a680-dc24157face5
Job Type: glueetl
Applying the following default arguments:
--glue_kernel_version 0.37.0
--enable-glue-datacatalog true
Waiting for session f9fd1c36-0

## Aux functions

In [2]:
from pyspark.sql.types import *
from pyspark import SQLContext

sqlContext = SQLContext(sc)
# Auxiliar functions
# pd to spark (fast)
def equivalent_type(f):
    if f == 'datetime64[ns]': return TimestampType()
    elif f == 'int64': return LongType()
    elif f == 'int32' or f == 'uint8': return IntegerType()
    elif f == 'float64': return DoubleType()
    elif f == 'float32': return FloatType()
    else: return StringType()

def define_structure(string, format_type):
    try: typo = equivalent_type(format_type)
    except: typo = StringType()
    return StructField(string, typo)

# Given pandas dataframe, it will return a spark's dataframe.
def pandas_to_spark(pandas_df):
    columns = list(pandas_df.columns)
    types = list(pandas_df.dtypes)
    struct_list = []
    for column, typo in zip(columns, types): 
      struct_list.append(define_structure(column, typo))
    p_schema = StructType(struct_list)
    return sqlContext.createDataFrame(pandas_df, p_schema)




## Read

In [3]:
# Read in data as dynamic frame
harddrive = glueContext.create_dynamic_frame.from_options(
    connection_type="s3",
    connection_options={
        "paths": ["s3://refit-iot/data/harddrive/"],
        "recurse": True,
        "header": "true"
    },
    format="csv"
)

# Convert to spark df
# All strings
harddrive_df = harddrive.toDF()




## Transform

In [4]:
header = harddrive_df.rdd.first()
harddrive_final = spark.createDataFrame(harddrive_df.rdd.filter(lambda x: x != header), header)




In [5]:
from pyspark.sql.functions import col, to_date, to_timestamp

# Time: str to timestamp
harddrive_final = harddrive_final.withColumn("date", to_date(col("date"), "yyyy-MM-dd"))

# The rest: str to double
cols_to_cast = harddrive_final.columns[3:]

for col_name in cols_to_cast:
    harddrive_final = harddrive_final.withColumn(col_name, col(col_name).cast("double"))




In [6]:
# feature engineering
from pyspark.sql.functions import max
from pyspark.sql.window import Window
from pyspark.sql.functions import col
from pyspark.sql.functions import datediff
import pyspark.sql.functions as F

df = harddrive_final
harddrive_failed = df.filter(df.failure == 1).select("serial_number")

df_analysis = df.join(harddrive_failed.distinct(), on='serial_number', how='inner')

windowSpec = Window.partitionBy("serial_number")
df_analysis = df_analysis.withColumn("end_date", max(col("date")).over(windowSpec))

df_analysis = df_analysis.withColumn("end_date", F.to_date(col("end_date")))
df_analysis = df_analysis.withColumn("date", F.to_date(col("date")))

df_analysis = df_analysis.withColumn("useful_life", datediff(col("end_date"), col("date")))




In [7]:
# Drop NA
def drop_null_columns(df):
    """
    This function drops all columns which contain null values.
    :param df: A PySpark DataFrame
    """
    import pyspark.sql.functions as F
    null_counts = df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df.columns]).collect()[0].asDict()
    to_drop = [k for k, v in null_counts.items() if v > 0]
    df = df.drop(*to_drop)
    return df

harddrive_clean = drop_null_columns(df_analysis)




In [8]:
# Further drop
harddrive_out = harddrive_clean.drop('date','serial_number', 'model', 'end_date', 'failure', 'smart_5_normalized', 'smart_198_raw',
              'smart_198_normalized','smart_199_normalized','smart_241_raw','smart_240_raw','smart_10_raw',
               'smart_197_normalized','smart_188_raw','smart_12_normalized','smart_10_normalized','smart_7_raw','smart_4_normalized',
               'smart_242_raw')
harddrive_out.printSchema()

root
 |-- capacity_bytes: double (nullable = true)
 |-- smart_1_normalized: double (nullable = true)
 |-- smart_1_raw: double (nullable = true)
 |-- smart_3_normalized: double (nullable = true)
 |-- smart_3_raw: double (nullable = true)
 |-- smart_4_raw: double (nullable = true)
 |-- smart_5_raw: double (nullable = true)
 |-- smart_7_normalized: double (nullable = true)
 |-- smart_9_normalized: double (nullable = true)
 |-- smart_9_raw: double (nullable = true)
 |-- smart_12_raw: double (nullable = true)
 |-- smart_194_normalized: double (nullable = true)
 |-- smart_194_raw: double (nullable = true)
 |-- smart_197_raw: double (nullable = true)
 |-- smart_199_raw: double (nullable = true)
 |-- useful_life: integer (nullable = true)


In [9]:
harddrive_out.count()

5490


## Write

In [10]:
import boto3
from awsglue.dynamicframe import DynamicFrame

# Convert to glue dyf
harddrive_dyf = DynamicFrame.fromDF(harddrive_out, glueContext, 'convert')

# Housekeeping
database_name = "harddrive"
table_name = "streamed"
glue_client = boto3.client('glue')

# Define schema
schema = harddrive_dyf.schema()
columns = [
    {
        "Name": field.name,
        "Type": field.dataType.typeName()
    }
    for field in schema.fields
]

# Create table configurations
create_table_options_streamed = {
    "DatabaseName": database_name,
    "TableInput": {
        "Name": table_name,
        "Description": "Streamed data for hard drive failures",
        
        "StorageDescriptor": {
            "Columns": columns,
            "Location": "s3://refit-iot/final_data_landing/harddrive/streamed/",
            "InputFormat": "org.apache.hadoop.mapred.TextInputFormat",
            "OutputFormat": "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat",
            "Compressed": False,
            "SerdeInfo": {
                "SerializationLibrary": "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe",
                "Parameters": {
                    "field.delim": ",",
                    "skip.header.line.count" : "1"
                }
            }
        },
        "PartitionKeys": []
    }
}

# Check if streamed table exists
# If the streamed table does not exist, create

try: 
    response = glue_client.get_table(
    DatabaseName=database_name,
    Name=table_name
)
    print(f"{table_name} already exists. Directly writing...")
except:
    glue_client = boto3.client('glue')
    response_streamed = glue_client.create_table(**create_table_options_streamed)
    print(f"{table_name} does not exist. Creating...")

glueContext.write_dynamic_frame.from_catalog(
    frame = harddrive_dyf,
    database = database_name,
    table_name = table_name
    
)

print(f"Sucessfully wrote to {table_name}")

streamed does not exist. Creating...
Sucessfully wrote to streamed
