In [None]:
#############################
##  SCRIPT LOCAL PARA ETL  ##
#############################





#importando modulos
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession.builder.master("local[*]").getOrCreate()

#funcao para definir o sentimento do texto
happy = [":)",":}",":]",":D",":p",":-)",";-)",":P",";)"]
sad = [":(",":{",":[",":-(",";-(",";(",":/"]

def buscaPositivo(palavra):
    for i in happy:
        if i in palavra and all(ch in i for ch in palavra):
            return True
    if palavra in happy:
        return True
    
def buscaNegativo(palavra):
    for i in sad:
        if i in palavra and all(ch in i for ch in palavra):
            return True
    if palavra in sad:
        return True

    
def buscarSentimento(string):
    string_splitada = string.split()
    
    for i in string_splitada:
        if buscaPositivo(i) == True:
            sentimento="Positivo"
            break
        elif buscaNegativo(i) == True:
            sentimento="Negativo"
            break
        else:
            sentimento="Neutro"
        
    return sentimento

def buscarSimbolo(string):
    string_splitada = string.split()
    
    for i in string_splitada:
        if buscaPositivo(i) == True:
            simbolo=i
            break
        elif buscaNegativo(i) == True:
            simbolo=i
            break
        else:
            simbolo=""
        
    return simbolo

#registrar funcoes UDF
buscarSentimentoUDF = spark.udf.register("buscarSentimentoUDF", buscarSentimento)
buscarSimboloUDF = spark.udf.register("buscarSimboloUDF", buscarSimbolo)
buscaPositivoUDF = spark.udf.register("buscaPositivoUDF", buscaPositivo)
buscaNegativoUDF = spark.udf.register("buscaNegativoUDF", buscaNegativo)


#le arquivo csv e cria df
df = spark.read.option("delimiter","\t") \
    .option("inferSchema",True) \
    .csv("rawdata.csv")


#renomeia colunas
df = df.withColumnRenamed("_c0","id") \
    .withColumnRenamed("_c1","tweet_text") \
    .withColumnRenamed("_c2","tweet_date")

#muda o data type da coluna tweet_date
df = df.select(col("id"),col("tweet_text"),to_date(col("tweet_date"),"dd-MM-yyyy").alias("tweet_date"))

#adiciona coluna sentimento
df = df.withColumn("sentimento", buscarSentimentoUDF(col("tweet_text")))

#adiciona coluna simbolo
df = df.withColumn("simbolo", buscarSimboloUDF(col("tweet_text")))

df = df\
    .withColumn('year',
                 date_format(col("tweet_date"), "yyyy"))\
    .withColumn('month',
                 date_format(col("tweet_date"), "MM"))\
    .withColumn('day',
                 date_format(col("tweet_date"), "dd"))

#printa
df.show(75)
df.printSchema



In [None]:
#############################
##   SCRIPT PARA O GLUE    ##
#############################




#########################################
### IMPORT LIBRARIES AND SET VARIABLES
#########################################
 
#Import python modules
from datetime import datetime
 
#Import pyspark modules
from pyspark.sql import SparkSession
from pyspark.context import SparkContext
from pyspark.sql.functions import *
 
#Import glue modules
#from awsglue.utils import getResolvedOptions
from awsglue.context import GlueContext
from awsglue.dynamicframe import DynamicFrame
#from awsglue.job import Job
 
#Initialize contexts and session
spark_context = SparkContext.getOrCreate()
glue_context = GlueContext(spark_context)
#session = glue_context.spark_session
spark = SparkSession.builder.master("local[*]").getOrCreate()

#Parameters
s3_write_path = "s3://refdesafioxpto/write"
 
#########################################
### EXTRACT (READ DATA)
#########################################
 
#Log starting time
dt_start = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print("Start time:", dt_start)

df = spark.read.csv("s3://rawdesafioxpto/tweets_xpto.csv", sep = '\t', inferSchema = True, encoding = "utf-8")
 
#########################################
### TRANSFORM (MODIFY DATA)
#########################################
 
happy = [":)",":}",":]",":D",":p",":-)",";-)",":P",";)"]
sad = [":(",":{",":[",":-(",";-(",";(",":/"]

def buscaPositivo(palavra):
    for i in happy:
        if i in palavra and all(ch in i for ch in palavra):
            return True
    if palavra in happy:
        return True
    
def buscaNegativo(palavra):
    for i in sad:
        if i in palavra and all(ch in i for ch in palavra):
            return True
    if palavra in sad:
        return True

    
def buscarSentimento(string):
    string_splitada = string.split()
    
    for i in string_splitada:
        if buscaPositivo(i) == True:
            sentimento="Positivo"
            break
        elif buscaNegativo(i) == True:
            sentimento="Negativo"
            break
        else:
            sentimento="Neutro"
        
    return sentimento

def buscarSimbolo(string):
    string_splitada = string.split()
    
    for i in string_splitada:
        if buscaPositivo(i) == True:
            simbolo=i
            break
        elif buscaNegativo(i) == True:
            simbolo=i
            break
        else:
            simbolo=""
        
    return simbolo

#registrar funcoes UDF
buscarSentimentoUDF = spark.udf.register("buscarSentimentoUDF", buscarSentimento)
buscarSimboloUDF = spark.udf.register("buscarSimboloUDF", buscarSimbolo)
buscaPositivoUDF = spark.udf.register("buscaPositivoUDF", buscaPositivo)
buscaNegativoUDF = spark.udf.register("buscaNegativoUDF", buscaNegativo)

#renomeia colunas
df = df.withColumnRenamed("_c0","id") \
    .withColumnRenamed("_c1","tweet_text") \
    .withColumnRenamed("_c2","tweet_date")

#muda o data type da coluna tweet_date
df = df.select(col("id"),col("tweet_text"),to_date(col("tweet_date"),"dd-MM-yyyy").alias("tweet_date"))

#adiciona coluna sentimento
df = df.withColumn("sentimento", buscarSentimentoUDF(col("tweet_text")))

#adiciona coluna simbolo
df = df.withColumn("simbolo", buscarSimboloUDF(col("tweet_text")))

df = df\
    .withColumn('year',
                 date_format(col("tweet_date"), "yyyy"))\
    .withColumn('month',
                 date_format(col("tweet_date"), "MM"))\
    .withColumn('day',
                 date_format(col("tweet_date"), "dd"))
 
#########################################
### LOAD (WRITE DATA)
#########################################
 
#Convert back to dynamic frame
dynamic_frame_write = DynamicFrame.fromDF(df, glue_context, "dynamic_frame_write")
 
#Write data back to S3
glue_context.write_dynamic_frame.from_options(
frame = dynamic_frame_write,
connection_type = "s3",
connection_options = {
"path": s3_write_path,
#Here you could create S3 prefixes according to a values in specified columns
"partitionKeys": ["year", "month", "day"]
},
format = "glueparquet"
)
 
#Log end time
dt_end = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print("End time:", dt_end)

In [None]:
#############################
##    SCRIPT PARA O EMR    ##
#############################

import argparse

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

def particionarDados(data_source, output_uri):
    with SparkSession.builder.appName("particionarDados").getOrCreate() as spark:
        
        #EXTRACT
        df = spark.read.csv(data_source, sep = '\t', inferSchema = True, encoding = "utf-8")
        
        #TRANSFORM
        happy = [":)",":}",":]",":D",":p",":-)",";-)",":P",";)"]
        sad = [":(",":{",":[",":-(",";-(",";(",":/"]

        def buscaPositivo(palavra):
            for i in happy:
                if i in palavra and all(ch in i for ch in palavra):
                    return True
            if palavra in happy:
                return True
    
        def buscaNegativo(palavra):
            for i in sad:
                if i in palavra and all(ch in i for ch in palavra):
                    return True
            if palavra in sad:
                return True

    
        def buscarSentimento(string):
            string_splitada = string.split()
    
            for i in string_splitada:
                if buscaPositivo(i) == True:
                    sentimento="Positivo"
                    break
                elif buscaNegativo(i) == True:
                    sentimento="Negativo"
                    break
                else:
                    sentimento="Neutro"
        
            return sentimento

        def buscarSimbolo(string):
            string_splitada = string.split()
    
            for i in string_splitada:
                if buscaPositivo(i) == True:
                    simbolo=i
                    break
                elif buscaNegativo(i) == True:
                    simbolo=i
                    break
                else:
                    simbolo=""
        
            return simbolo

        #registrar funcoes UDF
        buscarSentimentoUDF = spark.udf.register("buscarSentimentoUDF", buscarSentimento)
        buscarSimboloUDF = spark.udf.register("buscarSimboloUDF", buscarSimbolo)
        buscaPositivoUDF = spark.udf.register("buscaPositivoUDF", buscaPositivo)
        buscaNegativoUDF = spark.udf.register("buscaNegativoUDF", buscaNegativo)


        #renomeia colunas
        df = df.withColumnRenamed("_c0","id") \
            .withColumnRenamed("_c1","tweet_text") \
            .withColumnRenamed("_c2","tweet_date")

        #muda o data type da coluna tweet_date
        df = df.select(col("id"),col("tweet_text"),to_date(col("tweet_date"),"dd-MM-yyyy").alias("tweet_date"))

        #adiciona coluna sentimento
        df = df.withColumn("sentimento", buscarSentimentoUDF(col("tweet_text")))

        #adiciona coluna simbolo
        df = df.withColumn("simbolo", buscarSimboloUDF(col("tweet_text")))

        df = df\
            .withColumn('year',
                 date_format(col("tweet_date"), "yyyy"))\
            .withColumn('month',
                 date_format(col("tweet_date"), "MM"))\
            .withColumn('day',
                 date_format(col("tweet_date"), "dd"))
        

        # LOAD TO S3 PARQUET PARTICIONED
        df.write.partitionBy("year", "month", "day").mode("overwrite").parquet(output_uri)

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument(
        '--data_source', help="The URI for you CSV restaurant data, like an S3 bucket location.")
    parser.add_argument(
        '--output_uri', help="The URI where output is saved, like an S3 bucket location.")
    args = parser.parse_args()

    particionarDados(args.data_source, args.output_uri)