In [0]:
import argparse
import snscrape.modules.twitter as sntwitter

from pyspark.sql import SparkSession


def create_spark_views(customers_location: str, products_location: str, transactions_location: str):
    spark.read.format("com.crealytics.spark.excel").option("header","true").option("inferSchema", "true").load(location_2017)  .createOrReplaceTempView("sales_2017")
    
    spark.read.format("com.crealytics.spark.excel").option("header","true").option("inferSchema","true").load(location_2018).createOrReplaceTempView("sales_2018")
        
    spark.read.format("com.crealytics.spark.excel").option("header","true").option("inferSchema","true").load(location_2019).createOrReplaceTempView("sales_2019")

In [0]:
def run_transformations(location_2017: str, location_2018: str, location_2019: str, date_from, date_to, output_location: str, url:str):
    create_spark_views(location_2017, location_2018, location_2019)
    union_dataset()
    vendas_ano_mes()
    vendas_marca_linha()
    vendas_marca_ano_mes()
    vendas_linha_ano_mes()
    vendas_linha_dia_ano_mes()
    get_tweets(date_from, date_to)
    tweets_max_sales(output_location, url)

In [0]:
def union_dataset():
    result = spark.sql("""create or replace temp view vendas_consolidadas as 
                            select * from sales_2017 
                            union all 
                            select * from sales_2018 
                            union all 
                            select * from sales_2019""")  
    return result

In [0]:
def vendas_ano_mes():
    result = spark.sql("""create or replace temp view vendas_ano_mes as select sum(qtd_venda) as valor,  year(to_date(data_venda, "m/d/yy")) as ano,  month(to_date(data_venda, "m/d/yy")) as mes from vendas_consolidadas group by ano, mes""")
    #spark.write.parquet(output_location + "/vendas_ano_mes").partitionBy("data_venda")
    return result 

In [0]:
def vendas_marca_linha():
    result = spark.sql("""create or replace temp view vendas_marca_linha as select sum(qtd_venda) as valor, marca,linha as mes from vendas_consolidadas  group by marca,linha""")
    return result

In [0]:
def vendas_marca_ano_mes():
    result = spark.sql("""create or replace temp view vendas_marca_ano_mes as select sum(qtd_venda) as valor, marca, year(to_date(data_venda, "m/d/yy")) as ano,  month(to_date(data_venda, "m/d/yy")) as mes from vendas_consolidadas  group by marca,ano,mes""")
    return result

In [0]:
def vendas_linha_ano_mes():
    result = spark.sql("""create or replace temp view vendas_linha_ano_mes as select sum(qtd_venda) as valor, linha, year(to_date(data_venda, "m/d/yy")) as ano,  month(to_date(data_venda, "m/d/yy")) as mes from vendas_consolidadas group by linha,ano,mes""")
    return result

In [0]:
def vendas_linha_dia_ano_mes():
    result = spark.sql("""create or replace temp view vendas_linha_ano_mes_dia as select sum(qtd_venda) as valor, linha, year(to_date(data_venda, "m/d/yy")) as ano,  month(to_date(data_venda, "m/d/yy")) as mes, day(to_date(data_venda, "m/d/yy")) as dia from vendas_consolidadas  where month(to_date(data_venda, "m/d/yy"))=12 and year(to_date(data_venda, "m/d/yy"))=2019 group by linha,ano,mes, dia order by valor desc limit 1""")
    return result

In [0]:
def get_tweets(date_from, date_to):
    tweets_list2 = []

    for i,tweet in enumerate(sntwitter.TwitterSearchScraper(f'#boticário since:{date_from} until:{date_to}').get_items()):
        if i>500:
            break
        tweets_list2.append([tweet.date, tweet.id, tweet.content, tweet.user.username, tweet.lang])

    df_tweets = spark.createDataFrame(tweets_list2,['Datetime', 'Tweet Id', 'Text', 'Username', 'language']).createOrReplaceTempView("tweets")
    spark.sql("""create or replace temp view latest_tweets as select a.*, year(to_date(a.datetime, "m/d/yy")) as ano, month(to_date(a.datetime, "m/d/yy")) as mes, day(to_date(a.datetime, "m/d/yy")) as dia from tweets a where a.language="pt" order by a.datetime desc limit 50""")

In [0]:
def tweets_max_sales(output_location, url):
    result = spark.sql("""select b.username, text from vendas_linha_ano_mes_dia a join latest_tweets b on a.dia=b.dia and a.mes=b.mes and a.ano=b.ano""")
    
    try:
        result.coalesce(1).write.format("csv").option("sep",",").save(f"{output_location}/tweets")
    except Exception as e:
        print(e)
        
    try:
        result.write.format("jdbc")\
        .option("url", url) \
        .option("driver", driver).option("dbtable", "tweets1") \
        .save()
    except Exception as e:
        print(e)

In [0]:
if __name__ == "__main__":
        
    driver = "org.mariadb.jdbc.Driver"
    database_host = "35.188.94.247"
    database_port = 3306
    database_name = "dados"
    user_name = "root"
    password = "root"
    url = f"jdbc:mysql://{database_host}:{database_port}/{database_name}?user={user_name}&password={password}&allowPublicKeyRetrieval=true&useSSL=false"
      
    bucket_input  = "gs://playground-s-11-65b238af-data"
    bucket_output = "gs://playground-s-11-65b238af-data-output"
    location_2017 = f"{bucket_input}/Base2017.xlsx"
    location_2018 = f"{bucket_input}/Base_2018.xlsx"
    location_2019 = f"{bucket_input}/Base_2019.xlsx"
    output_location = f"{bucket_output}"
    date_from = "2019-12-01"
    date_to = "2019-12-31"

    run_transformations(location_2017, location_2018, location_2019, date_from, date_to, output_location, url)
    

In [0]:
#Test if the data is written in SQL Cloud Database


In [0]:
driver = "org.mariadb.jdbc.Driver"
database_host = "35.188.94.247"
database_port = 3306
database_name = "dados"
user_name = "root"
password = "root"
url = f"jdbc:mysql://{database_host}:{database_port}/{database_name}?user={user_name}&password={password}&allowPublicKeyRetrieval=true&useSSL=false"
try:
    tweets_table.write.format("jdbc")\
        .option("url", url) \
        .option("driver", driver).option("dbtable", "tweets") \
         .save()
except Exception as e:
    print(e)