In [1]:
%load_ext sparkver

In [2]:
%sparkver

Spark version selected: 2.4.7_2.12, py4j version selected: 0.10.7


In [3]:
from IPython.display import display, HTML
display(HTML("<style>.container { width:90% !important; }</style>"))

In [4]:
APP_NAME = 'piekny notebook'
QUEUE = 'ads_data'
EXECUTOR_MEMORY = '8g'
EXECUTOR_OVERHEAD = 2048
EXECUTOR_CORES = 8
EXECUTORS = 50
DRIVER_MEMORY = '8g'

from pyspark.sql.session import SparkSession
from pyspark.sql.functions import *
from time import time, strftime
from datetime import datetime, timedelta
import datetime
from pyspark.sql.types import *
from pyspark.sql.window import Window
from pyspark import StorageLevel

spark = (SparkSession.builder
    .appName(APP_NAME)
    .config('spark.yarn.queue', QUEUE)
    .config('spark.executor.memory', EXECUTOR_MEMORY)
    .config('spark.executor.cores', EXECUTOR_CORES)
    .config('spark.dynamicAllocation.enabled','true')
    .config('spark.dynamicAllocation.initialExecutors', 8)
    .config('spark.dynamicAllocation.minExecutors', 1)
    .config('spark.dynamicAllocation.maxExecutors', EXECUTORS)
    .config('spark.dynamicAllocation.schedulerBacklogTimeout', '3s')
    .config('spark.dynamicAllocation.executorIdleTimeout', '120s')
    .config('spark.shuffle.service.enabled','true')
    .config('spark.shuffle.service.port', 7337)
    .config('spark.yarn.executor.memoryOverhead', EXECUTOR_OVERHEAD)
    .config('spark.yarn.appMasterEnv.JAVA_HOME', '/opt/jre1.8.0')
    .config('spark.executorEnv.JAVA_HOME', '/opt/jre1.8.0')
    .config('spark.speculation', 'true')
    .config('spark.driver.memory', DRIVER_MEMORY)
    .config('spark.driver.maxResultSize', '20g')
    .enableHiveSupport()
    .getOrCreate())
print('Application master: http://rm.hadoop.qxlint:8088/proxy/%s' % spark.sparkContext.applicationId)


Application master: http://rm.hadoop.qxlint:8088/proxy/application_1621337340330_389241


In [5]:
import pandas as pd
import numpy as np

In [6]:
import requests
print(pd.__version__)

0.21.1


In [7]:
yest=(datetime.datetime.now()+timedelta(days=-1)).strftime("%Y-%m-%d")

In [8]:
pd.set_option('display.max_colwidth', 99999)
pd.set_option('display.max_columns', 999)
pd.set_option('display.float_format', lambda x: '%.3f' % x)

In [9]:
fromDate = (datetime.datetime.now()+timedelta(days=-7)).strftime("%Y-%m-%d")
toDate = (datetime.datetime.now()+timedelta(days=-1)).strftime("%Y-%m-%d")

In [10]:
print(fromDate, toDate)

2021-05-20 2021-05-26


In [11]:
split_table = spark.sql("""
    select adgroup_id
        , split.core_split core_split
        , split.google_split google_split
        , v_date
    from ads_prod.adgroups_budget_splits
    where v_date = '{}'
""".format(yest))

In [12]:
split_changed = (split_table
                 .where(col('google_split')!=15)
                )

In [13]:
adgroup_stats = (spark.table('ads_prod.adgroup_stats')
                 .where(col('v_date').between(fromDate, toDate))
                 .where(col('total_cost') > 0)
                 .withColumn('is_google', when(col('placement')=='google', "google").otherwise("rest"))
                 .groupBy('adgroup_id')
                 .pivot('is_google')
                 .agg(sum('total_cost').alias('reve_gross'))
               )

In [14]:
adgroup_gmv = (spark.table('ads_prod.adgroup_gmv')
               .where(col('date').between(fromDate, toDate))
               .withColumn('is_google', when(col('key.placement')=='google', "google_gmv").otherwise("rest_gmv"))
               .groupBy(col('key')['adgroup_id'].alias('adgroup_id'))
               .pivot('is_google')
               .agg(sum(col('aggregation')['direct_transactions_value'] 
                        + col('aggregation')['indirect_transactions_value']).alias('gmv_7d'))
             )

In [15]:
window = Window.partitionBy('adgroup_id').orderBy(col('key').desc())

mda = (spark.table('ads_prod.mongo_dump_adgroups_parquet')
       .where(col('v_date')==yest)
       .where(col('hour')=='23')
       .select(col('id').alias('adgroup_id')
              , explode(col('dailylimithistory'))
              )
       .withColumn('ranking', rank().over(window))
       .where(col('ranking')==1) ## tylko najnowszy budzet
       .select(col('adgroup_id')
               , col('key').alias('budget_change_date')
               , (col('value.value')*7).alias('budget')
              )
      )

In [16]:
total = (split_table
         .join(adgroup_stats, 'adgroup_id', 'inner')
         .join(mda, 'adgroup_id', 'inner')
         .withColumn('google_budget', col('google_split') * col('budget')/100)
         .withColumn('core_budget', col('core_split') * col('budget')/100)
         .withColumn('google_percent', col('google')/col('google_budget'))
         .withColumn('core_percent', col('rest')/col('core_budget'))
        )

In [17]:
total.count()

32771

## Ile wczoraj bylo zmienionych adgrup?

In [18]:

split_grouped = (split_table
                 .withColumn('is_changed', when(col('google_split')==15, 0).otherwise(1))
                 .groupBy('is_changed')
                 .agg(count('adgroup_id').alias('cnt'))
                )

split_grouped.toPandas()

Unnamed: 0,is_changed,cnt
0,1,129
1,0,71865


In [19]:
## 0.1% jak narazie

## Procent wykorzystania budzetu

In [20]:
budget_change = (total
                 .groupBy('google_split')
                 .agg(expr("percentile(core_percent, 0.25)").alias('core_25')
                      , expr("percentile(core_percent, 0.5)").alias('core_50')
                      , expr("percentile(core_percent, 0.75)").alias('core_75')
                      , expr("percentile(core_percent, 0.99)").alias('core_99')
                      , expr("percentile(google_percent, 0.25)").alias('google_25')
                      , expr("percentile(google_percent, 0.5)").alias('google_50')
                      , expr("percentile(google_percent, 0.75)").alias('google_75')
                      , expr("percentile(google_percent, 0.99)").alias('google_99')
                     )
                )

In [21]:
# ci ktorym obnizylismy split faktycznie wydaja wiekszosc budzetow w core
# ci ktorym podnieslismy split wydaja mniej w core
# dla wydatkow w google nie ma takiej zaleznosci
budget_change.toPandas()

Unnamed: 0,google_split,core_25,core_50,core_75,core_99,google_25,google_50,google_75,google_99
0,20,0.088,0.159,0.243,0.404,0.635,0.719,0.774,0.942
1,15,0.031,0.137,0.479,1.219,0.727,0.933,1.015,6.586
2,10,0.221,0.594,0.992,1.324,0.461,0.749,1.126,1.988


## Revenue

In [22]:
reve_change = (total
               .groupBy('google_split')
               .agg(expr("percentile(rest, 0.25)").alias('core_25')
                    , expr("percentile(rest, 0.5)").alias('core_50')
                    , expr("percentile(rest, 0.75)").alias('core_75')
                    , expr("percentile(rest, 0.99)").alias('core_99')
                    , expr("percentile(google, 0.25)").alias('google_25')
                    , expr("percentile(google, 0.5)").alias('google_50')
                    , expr("percentile(google, 0.75)").alias('google_75')
                    , expr("percentile(google, 0.99)").alias('google_99')
                   )
                )

In [23]:
## ciekawe wnioski - ci ktorym obnizylismy google wydaja w core wiecej (mediana) ale najwiekszych zostawilismy (99. percentyl)
## analogicznie dla gooogle
## cci ktorym podnieslismy split w core tez wydaja nieco wiecej, w druga strone to nie dziala
reve_change.toPandas()

Unnamed: 0,google_split,core_25,core_50,core_75,core_99,google_25,google_50,google_75,google_99
0,20,18.665,34.66,112.16,576.35,25.745,46.7,138.635,407.163
1,15,4.68,23.02,86.835,889.386,19.64,26.3,49.13,518.127
2,10,35.47,72.275,115.975,349.197,8.723,14.305,19.593,43.924


## ROI

In [24]:
total_roi = (split_table
             .join(adgroup_stats, 'adgroup_id', 'inner')
             .join(mda, 'adgroup_id', 'inner')
             .join(adgroup_gmv, 'adgroup_id', 'inner') ## tylko adgrupy z wydatkami i gmv
             .withColumn('google_budget', col('google_split') * col('budget')/100)
             .withColumn('core_budget', col('core_split') * col('budget')/100)
             .withColumn('google_roi', col('google_gmv')/col('google'))
             .withColumn('core_roi', col('rest_gmv')/col('rest'))
        )

In [25]:
roi_change = (total_roi
              .groupBy('google_split')
              .agg(expr("percentile(core_roi, 0.25)").alias('core_25')
                   , expr("percentile(core_roi, 0.5)").alias('core_50')
                   , expr("percentile(core_roi, 0.75)").alias('core_75')
                   , expr("percentile(core_roi, 0.99)").alias('core_99')
                   , expr("percentile(google_roi, 0.25)").alias('google_25')
                   , expr("percentile(google_roi, 0.5)").alias('google_50')
                   , expr("percentile(google_roi, 0.75)").alias('google_75')
                   , expr("percentile(google_roi, 0.99)").alias('google_99')
                   , count('adgroup_id').alias('cnt')
                  )
             )

In [26]:
## tu jest cos dziwnego - roi w google tych, ktorym zmniejszylismy split jest prawie 2x lepsze
roi_change.toPandas()

Unnamed: 0,google_split,core_25,core_50,core_75,core_99,google_25,google_50,google_75,google_99,cnt
0,20,7.282,11.24,18.544,75.57,2.883,5.715,10.467,28.69,66
1,15,4.875,9.873,21.05,248.2,2.753,5.808,11.984,91.983,22237
2,10,6.451,10.27,21.662,137.873,4.369,9.534,14.869,70.678,61
