In [29]:
from pyspark.sql import SparkSession

#### Iniciando a sessao Spark

In [None]:
spark = SparkSession.builder.appName("CreateABT").getOrCreate()

## Leitura das tabelas e tratamento dos dados
Obejtivo desta etapa é pegar os dados baixados do banco de dados e tratar os dados, apagando os dados duplicados, juntar as tabelas e criar mais variaveis explicativas para enriquecer os dados para os modelos

In [2]:
path_daumau = "daumau.csv"
path_desinstalacoes = "desinstalacoes.csv"
path_instalacoes = "installs.csv"
path_ratings = "ratings_reviews.csv"

In [108]:
daumau_00 = spark.read.csv(path_daumau, inferSchema = True, header = True)
daumau_00.createOrReplaceTempView("daumau_00")
daumau_00.cache()
daumau_00.count()


desinstalacoes_00 = spark.read.csv(path_desinstalacoes, inferSchema = True, header = True)
desinstalacoes_00.createOrReplaceTempView("desinstalacoes_00")
desinstalacoes_00.cache()
desinstalacoes_00.count()


instalacoes_00 = spark.read.csv(path_instalacoes, inferSchema = True, header = True)
instalacoes_00.createOrReplaceTempView("instalacoes_00")
instalacoes_00.cache()
instalacoes_00.count()


ratings_00 = spark.read.csv(path_ratings, inferSchema = True, header = True)
ratings_00.createOrReplaceTempView("ratings_00")
ratings_00.cache()
ratings_00.count()

47155

#### Essas duas tabelas havia dados duplicados, utilizei o distinct para remover os dados duplicados

In [109]:
instalacoes_01 = instalacoes_00.distinct()
instalacoes_01.createOrReplaceTempView("instalacoes_01")
instalacoes_01.cache()
instalacoes_01.count()


daumau_01 = daumau_00.distinct()
daumau_01.createOrReplaceTempView("daumau_01")
daumau_01.cache()
daumau_01.count()

41146

In [61]:
abt_00 = spark.sql("""
    select
        a.date, 
        a.appid, 
        a.category, 
        a.ratings, 
        a.daily_ratings, 
        a.reviews, 
        a.daily_reviews,
        b.newinstalls
    from 
        ratings_00 a
    left join
        instalacoes_01 b
    on (a.appid = b.appid and a.date = b.date)
    


""")
abt_00.createOrReplaceTempView("abt_00")
abt_00.cache()
abt_00.count()

47155

#### Criei uma variavel chamada diff_install_uninstall que basicamente é a diferença entre as intalações e as desinstalações do aplicativo numa data especifica

In [118]:
abt_01 = spark.sql("""
    select
        a.*,
        b.predictionLoss ,
        a.newinstalls - b.predictionLoss as diff_install_uninstall
    from
        abt_00 a
    left join
        desinstalacoes_00 b
    on (a.appid = b.appId and a.date = b.date )

        """)
abt_01.createOrReplaceTempView("abt_01")
abt_01.cache()
abt_01.count()

47155

In [119]:
abt_02 = spark.sql("""

    select
        a.*,
        b.dauReal
    from
        abt_01 a
    left join 
        daumau_01 b
    on (a.appid = b.appId and a.date = b.date)

            """)
abt_02.createOrReplaceTempView("abt_02")
abt_02.cache()
abt_02.count()

47155

#### Neste processo eu desenvolvo um pequeno book de variaveis que consiste em criar algumas variaveis utilizando diferentes combinações matemáticas em diferentes janelas de tempo (15, 30, 45, 60, 90 dias). Essa é uma tecnica muito utilizada em bancos para aproveitar 100% do potencial dos dados do Data Lake

In [191]:
abt_03 = spark.sql("""

    select
        a.date, 
        a.appid, 
        a.category, 
       case when category = 'FINANCE' then 1
            when category = 'OTHERS' then 2
            when category = 'SHOPPING' then 3
            when category = 'BUSINESS' then 4
            when category = 'TRAVEL_AND_LOCAL' then 5
            when category = 'FOOD_AND_DRINK' then 6
            end as category_flag,
        
        a.ratings, 
        a.daily_ratings, 
        a.reviews, 
        a.daily_reviews, 
        a.newinstalls, 
        a.predictionLoss, 
    
        
        sum(daily_ratings) over (partition by appid order by date rows between 15 preceding and current row) AS sum_15_daily_ratings,
        avg(daily_ratings) over (partition by appid order by date rows between 15 preceding and current row) AS avg_15_daily_ratings,
        min(daily_ratings) over (partition by appid order by date rows between 15 preceding and current row) AS min_15_daily_ratings,
        max(daily_ratings) over (partition by appid order by date rows between 15 preceding and current row) AS max_15_daily_ratings,
        sum(daily_ratings) over (partition by appid order by date rows between 30 preceding and current row) AS sum_30_daily_ratings,
        avg(daily_ratings) over (partition by appid order by date rows between 30 preceding and current row) AS avg_30_daily_ratings,
        min(daily_ratings) over (partition by appid order by date rows between 30 preceding and current row) AS min_30_daily_ratings,
        max(daily_ratings) over (partition by appid order by date rows between 30 preceding and current row) AS max_30_daily_ratings,
        sum(daily_ratings) over (partition by appid order by date rows between 45 preceding and current row) AS sum_45_daily_ratings,
        avg(daily_ratings) over (partition by appid order by date rows between 45 preceding and current row) AS avg_45_daily_ratings,
        min(daily_ratings) over (partition by appid order by date rows between 45 preceding and current row) AS min_45_daily_ratings,
        max(daily_ratings) over (partition by appid order by date rows between 45 preceding and current row) AS max_45_daily_ratings,
        sum(daily_ratings) over (partition by appid order by date rows between 60 preceding and current row) AS sum_60_daily_ratings,
        avg(daily_ratings) over (partition by appid order by date rows between 60 preceding and current row) AS avg_60_daily_ratings,
        min(daily_ratings) over (partition by appid order by date rows between 60 preceding and current row) AS min_60_daily_ratings,
        max(daily_ratings) over (partition by appid order by date rows between 60 preceding and current row) AS max_60_daily_ratings,
        sum(daily_ratings) over (partition by appid order by date rows between 90 preceding and current row) AS sum_90_daily_ratings,
        avg(daily_ratings) over (partition by appid order by date rows between 90 preceding and current row) AS avg_90_daily_ratings,
        min(daily_ratings) over (partition by appid order by date rows between 90 preceding and current row) AS min_90_daily_ratings,
        max(daily_ratings) over (partition by appid order by date rows between 90 preceding and current row) AS max_90_daily_ratings,
        sum(daily_reviews) over (partition by appid order by date rows between 15 preceding and current row) AS sum_15_daily_reviews,
        avg(daily_reviews) over (partition by appid order by date rows between 15 preceding and current row) AS avg_15_daily_reviews,
        min(daily_reviews) over (partition by appid order by date rows between 15 preceding and current row) AS min_15_daily_reviews,
        max(daily_reviews) over (partition by appid order by date rows between 15 preceding and current row) AS max_15_daily_reviews,
        sum(daily_reviews) over (partition by appid order by date rows between 30 preceding and current row) AS sum_30_daily_reviews,
        avg(daily_reviews) over (partition by appid order by date rows between 30 preceding and current row) AS avg_30_daily_reviews,
        min(daily_reviews) over (partition by appid order by date rows between 30 preceding and current row) AS min_30_daily_reviews,
        max(daily_reviews) over (partition by appid order by date rows between 30 preceding and current row) AS max_30_daily_reviews,
        sum(daily_reviews) over (partition by appid order by date rows between 45 preceding and current row) AS sum_45_daily_reviews,
        avg(daily_reviews) over (partition by appid order by date rows between 45 preceding and current row) AS avg_45_daily_reviews,
        min(daily_reviews) over (partition by appid order by date rows between 45 preceding and current row) AS min_45_daily_reviews,
        max(daily_reviews) over (partition by appid order by date rows between 45 preceding and current row) AS max_45_daily_reviews,
        sum(daily_reviews) over (partition by appid order by date rows between 60 preceding and current row) AS sum_60_daily_reviews,
        avg(daily_reviews) over (partition by appid order by date rows between 60 preceding and current row) AS avg_60_daily_reviews,
        min(daily_reviews) over (partition by appid order by date rows between 60 preceding and current row) AS min_60_daily_reviews,
        max(daily_reviews) over (partition by appid order by date rows between 60 preceding and current row) AS max_60_daily_reviews,
        sum(daily_reviews) over (partition by appid order by date rows between 90 preceding and current row) AS sum_90_daily_reviews,
        avg(daily_reviews) over (partition by appid order by date rows between 90 preceding and current row) AS avg_90_daily_reviews,
        min(daily_reviews) over (partition by appid order by date rows between 90 preceding and current row) AS min_90_daily_reviews,
        max(daily_reviews) over (partition by appid order by date rows between 90 preceding and current row) AS max_90_daily_reviews,
        sum(newinstalls) over (partition by appid order by date rows between 15 preceding and current row) AS sum_15_newinstalls,
        avg(newinstalls) over (partition by appid order by date rows between 15 preceding and current row) AS avg_15_newinstalls,
        min(newinstalls) over (partition by appid order by date rows between 15 preceding and current row) AS min_15_newinstalls,
        max(newinstalls) over (partition by appid order by date rows between 15 preceding and current row) AS max_15_newinstalls,
        sum(newinstalls) over (partition by appid order by date rows between 30 preceding and current row) AS sum_30_newinstalls,
        avg(newinstalls) over (partition by appid order by date rows between 30 preceding and current row) AS avg_30_newinstalls,
        min(newinstalls) over (partition by appid order by date rows between 30 preceding and current row) AS min_30_newinstalls,
        max(newinstalls) over (partition by appid order by date rows between 30 preceding and current row) AS max_30_newinstalls,
        sum(newinstalls) over (partition by appid order by date rows between 45 preceding and current row) AS sum_45_newinstalls,
        avg(newinstalls) over (partition by appid order by date rows between 45 preceding and current row) AS avg_45_newinstalls,
        min(newinstalls) over (partition by appid order by date rows between 45 preceding and current row) AS min_45_newinstalls,
        max(newinstalls) over (partition by appid order by date rows between 45 preceding and current row) AS max_45_newinstalls,
        sum(newinstalls) over (partition by appid order by date rows between 60 preceding and current row) AS sum_60_newinstalls,
        avg(newinstalls) over (partition by appid order by date rows between 60 preceding and current row) AS avg_60_newinstalls,
        min(newinstalls) over (partition by appid order by date rows between 60 preceding and current row) AS min_60_newinstalls,
        max(newinstalls) over (partition by appid order by date rows between 60 preceding and current row) AS max_60_newinstalls,
        sum(newinstalls) over (partition by appid order by date rows between 90 preceding and current row) AS sum_90_newinstalls,
        avg(newinstalls) over (partition by appid order by date rows between 90 preceding and current row) AS avg_90_newinstalls,
        min(newinstalls) over (partition by appid order by date rows between 90 preceding and current row) AS min_90_newinstalls,
        max(newinstalls) over (partition by appid order by date rows between 90 preceding and current row) AS max_90_newinstalls,
        sum(predictionLoss) over (partition by appid order by date rows between 15 preceding and current row) AS sum_15_predictionLoss,
        avg(predictionLoss) over (partition by appid order by date rows between 15 preceding and current row) AS avg_15_predictionLoss,
        min(predictionLoss) over (partition by appid order by date rows between 15 preceding and current row) AS min_15_predictionLoss,
        max(predictionLoss) over (partition by appid order by date rows between 15 preceding and current row) AS max_15_predictionLoss,
        sum(predictionLoss) over (partition by appid order by date rows between 30 preceding and current row) AS sum_30_predictionLoss,
        avg(predictionLoss) over (partition by appid order by date rows between 30 preceding and current row) AS avg_30_predictionLoss,
        min(predictionLoss) over (partition by appid order by date rows between 30 preceding and current row) AS min_30_predictionLoss,
        max(predictionLoss) over (partition by appid order by date rows between 30 preceding and current row) AS max_30_predictionLoss,
        sum(predictionLoss) over (partition by appid order by date rows between 45 preceding and current row) AS sum_45_predictionLoss,
        avg(predictionLoss) over (partition by appid order by date rows between 45 preceding and current row) AS avg_45_predictionLoss,
        min(predictionLoss) over (partition by appid order by date rows between 45 preceding and current row) AS min_45_predictionLoss,
        max(predictionLoss) over (partition by appid order by date rows between 45 preceding and current row) AS max_45_predictionLoss,
        sum(predictionLoss) over (partition by appid order by date rows between 60 preceding and current row) AS sum_60_predictionLoss,
        avg(predictionLoss) over (partition by appid order by date rows between 60 preceding and current row) AS avg_60_predictionLoss,
        min(predictionLoss) over (partition by appid order by date rows between 60 preceding and current row) AS min_60_predictionLoss,
        max(predictionLoss) over (partition by appid order by date rows between 60 preceding and current row) AS max_60_predictionLoss,
        sum(predictionLoss) over (partition by appid order by date rows between 90 preceding and current row) AS sum_90_predictionLoss,
        avg(predictionLoss) over (partition by appid order by date rows between 90 preceding and current row) AS avg_90_predictionLoss,
        min(predictionLoss) over (partition by appid order by date rows between 90 preceding and current row) AS min_90_predictionLoss,
        max(predictionLoss) over (partition by appid order by date rows between 90 preceding and current row) AS max_90_predictionLoss,
        sum(diff_install_uninstall) over (partition by appid order by date rows between 15 preceding and current row) AS sum_15_diff_install_uninstall,
        avg(diff_install_uninstall) over (partition by appid order by date rows between 15 preceding and current row) AS avg_15_diff_install_uninstall,
        min(diff_install_uninstall) over (partition by appid order by date rows between 15 preceding and current row) AS min_15_diff_install_uninstall,
        max(diff_install_uninstall) over (partition by appid order by date rows between 15 preceding and current row) AS max_15_diff_install_uninstall,
        sum(diff_install_uninstall) over (partition by appid order by date rows between 30 preceding and current row) AS sum_30_diff_install_uninstall,
        avg(diff_install_uninstall) over (partition by appid order by date rows between 30 preceding and current row) AS avg_30_diff_install_uninstall,
        min(diff_install_uninstall) over (partition by appid order by date rows between 30 preceding and current row) AS min_30_diff_install_uninstall,
        max(diff_install_uninstall) over (partition by appid order by date rows between 30 preceding and current row) AS max_30_diff_install_uninstall,
        sum(diff_install_uninstall) over (partition by appid order by date rows between 45 preceding and current row) AS sum_45_diff_install_uninstall,
        avg(diff_install_uninstall) over (partition by appid order by date rows between 45 preceding and current row) AS avg_45_diff_install_uninstall,
        min(diff_install_uninstall) over (partition by appid order by date rows between 45 preceding and current row) AS min_45_diff_install_uninstall,
        max(diff_install_uninstall) over (partition by appid order by date rows between 45 preceding and current row) AS max_45_diff_install_uninstall,
        sum(diff_install_uninstall) over (partition by appid order by date rows between 60 preceding and current row) AS sum_60_diff_install_uninstall,
        avg(diff_install_uninstall) over (partition by appid order by date rows between 60 preceding and current row) AS avg_60_diff_install_uninstall,
        min(diff_install_uninstall) over (partition by appid order by date rows between 60 preceding and current row) AS min_60_diff_install_uninstall,
        max(diff_install_uninstall) over (partition by appid order by date rows between 60 preceding and current row) AS max_60_diff_install_uninstall,
        sum(diff_install_uninstall) over (partition by appid order by date rows between 90 preceding and current row) AS sum_90_diff_install_uninstall,
        avg(diff_install_uninstall) over (partition by appid order by date rows between 90 preceding and current row) AS avg_90_diff_install_uninstall,
        min(diff_install_uninstall) over (partition by appid order by date rows between 90 preceding and current row) AS min_90_diff_install_uninstall,
        max(diff_install_uninstall) over (partition by appid order by date rows between 90 preceding and current row) AS max_90_diff_install_uninstall, 

        a.dauReal
        from abt_02 a
        

""")
abt_03.createOrReplaceTempView("abt_03")
abt_03.cache()
abt_03.count()

47155

#### Removi os dados que tinha o valor do target como nulo 

In [192]:
abt_04 = abt_03.where("dauReal is not null")

#### Salvei os dados em csv dentro da pasta abt

In [195]:
abt_04.repartition(1).write.csv("abt", header = True)