In [1]:
import os
import requests
import json
import re
import glob
import numpy as np
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pandas as pd

In [2]:
RAW = './raw/'
MODELED = './modeled/'

In [3]:
builder = SparkSession.builder.appName("Data scrub")
builder.config(
    "spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "2")
builder.config("spark.speculation", "false")
builder.config("spark.sql.parquet.compression.codec", "gzip")
builder.config("spark.debug.maxToStringFields", "100")
builder.config("spark.driver.memory", "9g")
builder.config("spark.driver.cores", "4")
builder.config("spark.executor-memory", "20g")
builder.config("spark.executor.cores", "4")
builder.master("local[*]")

spark = builder.getOrCreate()

In [4]:
spark

In [5]:
files = glob.glob(RAW + '/*.json')

In [6]:
files

['./raw/aluno.json',
 './raw/curso.json',
 './raw/logIzz.json',
 './raw/log_aula.json',
 './raw/matricula.json',
 './raw/pedido.json',
 './raw/transacao_pagseguro.json',
 './raw/user.json']

In [11]:
schemas = {}
schemas['aluno'] = StructType([
    StructField("cep", StringType(), True),
    StructField("cidade", StringType(), True),
    StructField("data_nascimento", DateType(), True),
    StructField("data_registro", DateType(), True),
    StructField("domain", StringType(), True),
    StructField("estado", StringType(), True),
    StructField("id", DoubleType(), True),
    StructField("logradouro", StringType(), True),
    StructField("user_id", DoubleType(), True),
])
schemas['curso'] = StructType([
    StructField("created", DateType(), True),
    StructField("domain", StringType(), True),
    StructField("nome", StringType(), True),
    StructField("valor", DoubleType(), True),
    StructField("id", DoubleType(), True),
    StructField("duracao", DoubleType(), True),
    StructField("tipo", StringType(), True),
])
schemas['logIzz'] = StructType([
    StructField("created", TimestampType(), True),
    StructField("action", StringType(), True),
    StructField("domain", StringType(), True),
    StructField("model_id", IntegerType(), True),
    StructField("id", IntegerType(), True),
    StructField("user_id", IntegerType(), True),
    StructField("model", StringType(), True),
])
schemas['log_aula'] = StructType([
    StructField("aluno_id", DoubleType(), True),
    StructField("aulas_parte_id", DoubleType(), True),
    StructField("data_cadastro", StringType(), True),
    StructField("domain", StringType(), True),
    StructField("id", DoubleType(), True),
    StructField("matricula_id", DoubleType(), True),
    StructField("qtd_acesso", DoubleType(), True),
])
schemas['matricula'] = StructType([
    StructField("aluno_id", DoubleType(), True),
    StructField("cupom", StringType(), True),
    StructField("curso_id", DoubleType(), True),
    StructField("curso_valor", DoubleType(), True),
    StructField("data_cancelamento", DateType(), True),
    StructField("data_expiracao", DateType(), True),
    StructField("data_liberacao", DateType(), True),
    StructField("data_matricula", DateType(), True),
    StructField("domain", StringType(), True),
    StructField("id", DoubleType(), True),
    StructField("pedido_id", DoubleType(), True),
    StructField("status", StringType(), True),
])
schemas['pedido'] = StructType([
    StructField("aluno_id", DoubleType(), True),
    StructField("data", TimestampType(), True),
    StructField("data_cancelamento", DateType(), True),
    StructField("data_liberacao", DateType(), True),
    StructField("domain", StringType(), True),
    StructField("id", DoubleType(), True),
    StructField("status", StringType(), True),
])
schemas['transacao_pagseguro'] = StructType([
    StructField("data_transacao", DateType(), True),
    StructField("domain", StringType(), True),
    StructField("parcelas", DoubleType(), True),
    StructField("pedido_id", DoubleType(), True),
    StructField("tipo_pagamento", StringType(), True),
    StructField("id", DoubleType(), True),
    StructField("transacaoid", StringType(), True),
    StructField("status", StringType(), True),
])
schemas['user'] = StructType([
    StructField("acesso", DateType(), True),
    StructField("domain", StringType(), True),
    StructField("id", IntegerType(), True),
    StructField("perfil_id", IntegerType(), True),
    StructField("ultimo_ip", StringType(), True),
])

In [14]:
for file in files:
    model = re.sub('.*/', '', re.sub(RAW, '',file)).replace('.json', '') 
    df = spark.read.schema(schemas[model]).json(file)
    df.write.mode('overwrite').parquet(MODELED+model+".parquet")