In [None]:
# conding: utf-8

from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import *

import json
import time

In [None]:
from pprint import pprint
from sympy.interactive import printing
printing.init_printing()

In [None]:
"""
    Este trecho de código é necessário apenas quando o notebook é executado em ambiente local
"""

from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
try:
    sc = SparkContext('local')
    spark = SparkSession(sc)
except ValueError:
    pass  # a spark context is already created

### Recuperando schema interface

In [None]:
# structured_jsons_path = '../../../../aws_s3/teste/structured_json/checkout/00_CheckoutOrder/001E600A63944702BF4EA7A92FCD3833/id/*'
structured_jsons_path = '../../../../data/s3/checkout/structured_json/{*}CheckoutOrder/*/id/*'
structured_df = spark.read.json(structured_jsons_path)

schema = structured_df.schema

### Lendo dados

In [None]:
# history_datapath = '../../../../aws_s3/teste/history/checkout/001E600A63944702BF4EA7A92FCD3833/id/*'
history_datapath = '../../../../data/s3/checkout/history/YY_CheckoutOrder/*/id/*'
df = spark.read.json(history_datapath)

In [None]:
print "Ao todo há {} observações nos dados".format(df.count())

### Funções auxiliares

In [None]:
from pyspark.sql.utils import AnalysisException

def has_column(df, col):
    try:
        df[col]
        return True
    except AnalysisException:
        return False

### Código para converter os dados com spark usando cast

#### Casting de Items

In [None]:
# Get Item Type
types = filter(lambda f: f.name == "Items", structured_df.schema.fields)
itemType = types[0].dataType

def convert_item(data):
    def _parse_product_categories(raw_product_categories):
        product_categories = []
        for k,v in raw_product_categories.items():
            product_categories.append({
                'id': k,
                'name': v
            })
        return product_categories
    
    items = json.loads(data)
    for item in items:
        product_categories = _parse_product_categories(item.get('productCategories', {}))
        item['productCategories'] = product_categories

    return items

# Register functions as Spark UDFs 
udf_getData = UserDefinedFunction(convert_item, itemType)

df = df.withColumn('Items', udf_getData("Items"))

In [None]:
s = time.time()

df.select("Items").show(5)

e = time.time()

print "Tempo executando: {}".format(e-s)

#### Casting de ItemMetadata

In [None]:
types = filter(lambda f: f.name == "ItemMetadata", structured_df.schema.fields)
item_metadata_type = types[0].dataType

def convert_item_metadata(data):
    if data:
        itemmetadata = json.loads(data)
        len_items_itemmetadata = len(itemmetadata["items"])
        for i in range(len_items_itemmetadata):
            if ("assemblyOptions" in itemmetadata["items"][i]):
                del itemmetadata["items"][i]["assemblyOptions"]
        return itemmetadata
        
# Register functions as Spark UDFs 
udf_convert_item_metadata = UserDefinedFunction(convert_item_metadata, item_metadata_type)

if has_column(df, "ItemMetadata"):
    df = df.withColumn("ItemMetadata", udf_convert_item_metadata("ItemMetadata"))

In [None]:
s = time.time()

df.select("ItemMetadata").show(5)

e = time.time()

print "Tempo executando: {}".format(e-s)

#### Casting RateAndBenefits

In [None]:
types = filter(lambda f: f.name == "RatesAndBenefitsData", structured_df.schema.fields)
rateandbenefits_type = types[0].dataType

def convert_ratesandbenefits(data):
    KEY_IDENTIFIERS = "rateAndBenefitsIdentifiers"
    KEY_MATCH_PARAMS = "matchedParameters"
    KEY_ADDINFO = "additionalInfo"
    data = json.loads(data)
    if data and KEY_IDENTIFIERS in data:
        for i in range(len(data[KEY_IDENTIFIERS])):
            if KEY_MATCH_PARAMS in data[KEY_IDENTIFIERS][i]:
                del data[KEY_IDENTIFIERS][i][KEY_MATCH_PARAMS]
            if KEY_ADDINFO in data[KEY_IDENTIFIERS][i]:
                del data[KEY_IDENTIFIERS][i][KEY_ADDINFO]
    return data
        
# Register functions as Spark UDFs 
udf_convert_ratesandbenefits = UserDefinedFunction(convert_ratesandbenefits, rateandbenefits_type)

if has_column(df, "RatesAndBenefitsData"):
    df = df.withColumn("RatesAndBenefitsData", udf_convert_ratesandbenefits("RatesAndBenefitsData"))

In [None]:
s = time.time()

df.select("RatesAndBenefitsData").show(1, False)

e = time.time()

print "Tempo executando: {}".format(e-s)

#### Casting CustomData

In [None]:
from pyspark.sql.utils import AnalysisException
try:
    print df.where("CustomData is not null").select("CustomData").count()
except AnalysisException:
    print "There is no col CustomData in data"

In [None]:
types = filter(lambda f: f.name == "CustomData", structured_df.schema.fields)
customdata_type = types[0].dataType

def convert_customdata(data):
    KEY_CUSTOMAPP = "customApps"
    KEY_FIELDS = "fields"
    KEY_EXTRA_CONTENT = "cart-extra-context"
    
    customdata = data and json.loads(data)
    if customdata and KEY_CUSTOMAPP in customdata:
        for i in range(len(customdata[KEY_CUSTOMAPP])):
            if KEY_FIELDS in customdata[KEY_CUSTOMAPP][i] and\
                KEY_EXTRA_CONTENT in customdata[KEY_CUSTOMAPP][i][KEY_FIELDS]:
                    del customdata[KEY_CUSTOMAPP][i][KEY_FIELDS][KEY_EXTRA_CONTENT]
    return customdata
        
# Register functions as Spark UDFs 
udf_convert_customdata = UserDefinedFunction(convert_customdata, customdata_type)


if has_column(df, "CustomData"):
    df = df.withColumn("CustomData", udf_convert_customdata("CustomData"))

In [None]:
s = time.time()

df.select("CustomData").show(5)

e = time.time()

print "Tempo executando: {}".format(e-s)

#### Casting Attachment

In [None]:
ATTACHMENT = "attachment"


def remove_attachments(dic):    
    dic_copy = dic.copy()
    for key in dic_copy:
        if(ATTACHMENT in key.lower()):
            del dic[key]
        elif(type(dic_copy[key]) == dict):
            remove_attachments(dic[key])
        elif(type(dic_copy[key]) == list):
            for item in dic_copy[key]:
                if(type(item) == dict):
                    remove_attachments(item)
    return dic
    
def field_cleansing(field, cleansing_func=remove_attachments):
    if field is not str: return
    structuted_field = field and json.loads(field)
    if type(structuted_field) is dict:
        structuted_field = cleansing_func(structuted_field)
    return structuted_field


In [None]:
for field in df.schema.fields:
    field_type = field.dataType
    field_name = field.name

    if ATTACHMENT in field_name.lower():
        df = df.drop(field_name)
    elif field_type != StringType():
        udf_get_transform_data = UserDefinedFunction(lambda f: field_cleansing(f), field_type)
        df = df.withColumn(field_name, udf_get_transform_data(field_name))

In [None]:
s = time.time()

df.show(1, False)

e = time.time()

print "Tempo executando: {}".format(e-s)

In [None]:
df.collect()

#### Particionando dados e escrevendo

In [None]:
def getYear(lastChange, creationDate):
    date = lastChange if lastChange is not None else creationDate
    return date.split('T')[0].split('-')[0]

def getMonth(lastChange, creationDate):
    date = lastChange if lastChange is not None else creationDate
    return date.split('T')[0].split('-')[1]

def getDay(lastChange, creationDate):
    date = lastChange if lastChange is not None else creationDate
    return date.split('T')[0].split('-')[2]

# Register functions as Spark UDFs 
udf_getYear = UserDefinedFunction(getYear, StringType())
udf_getMonth = UserDefinedFunction(getMonth, StringType())
udf_getDay = UserDefinedFunction(getDay, StringType())

In [None]:
# Create the Columns for the Partitions
df = df.withColumn('YEAR', udf_getYear(df.LastChange, df.CreationDate))
df = df.withColumn('MONTH', udf_getMonth(df.LastChange, df.CreationDate))
df = df.withColumn('DAY', udf_getDay(df.LastChange, df.CreationDate))

In [None]:
# Save table to S3 using Parquet format and partitioning by defined columns
df.write.partitionBy(['YEAR','MONTH','DAY','InstanceId'])\
    .mode('append')\
    .parquet('../../../../blablabla/')
#     .parquet('s3://vtex.datalake/consumable_tables/')

In [None]:
df.printSchema()