In [1]:
import io
import sys
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, datediff
from pyspark.sql import functions as F

In [2]:
spark = SparkSession.builder.appName('Project').getOrCreate()

In [3]:
spark

In [4]:
input_path = "C:\\Users\\Al\\Desktop\\stepik-ds-course\\Week3\\Project\\clickstream.parquet"

In [5]:
output_path = "C:\\Users\\Al\\Desktop\\stepik-ds-course\\Week3\\Project\\result\\"

In [6]:
df = spark.read.parquet(input_path)

In [54]:
df.show(3)

+----------+-------------------+-----+--------+------+---------------+-----------------+------------+-------+---------+---------------------+
|      date|               time|event|platform| ad_id|client_union_id|compaign_union_id|ad_cost_type|ad_cost|has_video|target_audience_count|
+----------+-------------------+-----+--------+------+---------------+-----------------+------------+-------+---------+---------------------+
|2019-04-01|2019-04-01 04:00:48| view| android| 45061|          34734|            45061|         CPM|  200.6|        0|              1955269|
|2019-04-01|2019-04-01 04:00:48| view|     web|121288|         121288|           121288|         CPM|  187.4|        0|               232011|
|2019-04-01|2019-04-01 04:01:03| view| android|102737|         102535|           102564|         CPC|   60.7|        0|                 4410|
+----------+-------------------+-----+--------+------+---------------+-----------------+------------+-------+---------+---------------------+
only s

In [8]:
df.agg(F.countDistinct("date")).show()

+-----------+
|count(date)|
+-----------+
|          6|
+-----------+



In [9]:
df.dtypes

[('date', 'date'),
 ('time', 'timestamp'),
 ('event', 'string'),
 ('platform', 'string'),
 ('ad_id', 'int'),
 ('client_union_id', 'int'),
 ('compaign_union_id', 'int'),
 ('ad_cost_type', 'string'),
 ('ad_cost', 'double'),
 ('has_video', 'int'),
 ('target_audience_count', 'decimal(10,0)')]

date               | день, в который происходят события
time	           | точное время события
event	           | тип события, может быть или показ или клик по рекламе
platform	       | платформа, на которой произошло рекламное событие
ad_id	           | id рекламного объявления
client_union_id	id |рекламного клиента
campaign_union_id  | id рекламной кампании
ad_cost_type	   | тип объявления с оплатой за клики (CPC) или за показы (CPM)
ad_cost	           | стоимость объявления в рублях, для CPC объявлений - это цена за клик, для CPM - цена за 1000 показов
has_video	       | есть ли у рекламного объявления видео
target_audience_count | 	размер аудитории, на которую таргетируется объявление


In [10]:
df.count()

1000000

In [11]:
splits = df.randomSplit([0.75, 0.25])

In [12]:
splits[1].count() #проверяем что разделилось на части

250358

In [13]:
splits[0].write.parquet(output_path + "train\\")
splits[1].write.parquet(output_path + "test\\")

*Надо:*
ad_id	integer	      id рекламного объявления
target_audience_count decimal размер аудитории, на которую таргетируется объявление
has_video  integer	    1 если есть видео, иначе 0
- is_cpm	integer	    1 если тип объявления CPM, иначе 0
- is_cpc	integer	    1 если тип объявления CPC, иначе 0
ad_cost	double	    стоимость объявления в рублях
- day_count  integer	Число дней, которое показывалась реклама
+ CTR	  double	     Отношение числа кликов к числу просмотров

количество дней показа по объявлению 

In [14]:
ad_days_count_df = df.filter(df['event'] == 'view') \
                    .groupBy('ad_id') \
                    .agg(F.countDistinct("date"))\
                    .withColumnRenamed('count(date)', 'days_count')
#ad_days_count_df.show()

CTR (отношение числа кликов к числу просмотров)

In [15]:
#SparkSQL with SQL exprs:
df.createOrReplaceTempView('clickstream') # регистрируем DF как таблицу
ctr_df = spark.sql("select ad_id, views, clicks, clicks / views as ctr \
                                from (select ad_id,\
                                             count(CASE WHEN event = 'view' THEN 1 END)  as views,\
                                             count(CASE WHEN event = 'click' THEN 1 END) as clicks \
                                      from clickstream \ 
                                      group by ad_id \
                                      having views != 0)").show()

SyntaxError: EOL while scanning string literal (<ipython-input-15-126eb3745c13>, line 7)

In [None]:
#spark sql without sql expr:
count_cond_lambda = lambda cond: F.sum(F.when(cond, 1).otherwise(0))

ctr_df = df.groupBy('ad_id') \
           .agg(count_cond_lambda(F.col('event') == 'view').alias('views'), \
                count_cond_lambda(F.col('event') == 'click').alias('clicks')) \
           .select('ad_id', F.expr("clicks / views").alias('CTR')) 
ctr_df.show()          

is_cpm и is_cpc:

In [None]:
#ndf = df.withColumn('is_cpm', int(col('ad_cost_type') == 'CPM'))\
#        .withColumn('is_cpc', int(col('ad_cost_type') == 'CPC') )

In [None]:
type(ctr_df)

In [None]:

ndf = df.select(['ad_id', \
                 'target_audience_count',  \
                 'has_video', \
                 (col('ad_cost_type') == 'CPM').cast('integer').alias('is_cpm'), \
                 (col('ad_cost_type') == 'CPC').cast('integer').alias('is_cpc'), \
                 'ad_cost']) \
        .join(ad_days_count_df, 'ad_id', 'left') \
        .join(ctr_df, 'ad_id', 'left') 
ndf.show()


In [None]:
ndf.count()

In [None]:
splits = ndf.randomSplit([0.75, 0.25])
splits[0].write.parquet(output_path + "train\\")
splits[1].write.parquet(output_path + "test\\")

In [None]:
splits[1].count()

In [17]:
df.createOrReplaceTempView('clickstream') # регистрируем DF как таблицу
ctr_df = spark.sql("select ad_id, \
                               count(CASE WHEN event = 'view' THEN 1 END) as views, \
                               count(CASE WHEN event = 'click' THEN 1 END) / count(CASE WHEN event = 'view' THEN 1 END) as ctr \
                        from clickstream \
                        group by ad_id \
                        having views != 0")
ctr_df.show()

+------+-----+--------------------+
| ad_id|views|                 ctr|
+------+-----+--------------------+
| 47217|   22|0.045454545454545456|
| 40515|  140| 0.02857142857142857|
| 33412|   35|                 0.0|
| 33602|  480|             0.01875|
| 20596|  641|  0.0062402496099844|
|119169|  636|0.007861635220125786|
|116158| 5589|                 0.0|
| 46938|   47| 0.02127659574468085|
|114166|   26|                 0.0|
| 43921|   22|                 0.0|
| 98184|   37| 0.02702702702702703|
| 39433|   28|                 0.0|
| 34713|  103|                 0.0|
| 20467|   88|0.011363636363636364|
| 15162|   81|                 0.0|
| 18759|   95|                 0.0|
| 45769|  185|                 0.0|
| 38651|   33|                 0.0|
|110871|  106|0.009433962264150943|
| 37864|  102|                 0.0|
+------+-----+--------------------+
only showing top 20 rows



In [20]:
count_cond_lambda = lambda cond: F.sum(F.when(cond, 1).otherwise(0))
    
# Вариант без SQL кода:
ctr_df = df.groupBy('ad_id') \
               .agg(count_cond_lambda(F.col('event') == 'view').alias('views'), \
                    count_cond_lambda(F.col('event') == 'click').alias('clicks')) \
               .select('ad_id', F.expr("clicks / views").alias('CTR')) # селектим только CTR, чтобы в итоговой выборке не дропать clicks и views
ctr_df.show()

+------+--------------------+
| ad_id|                 CTR|
+------+--------------------+
| 47217|0.045454545454545456|
| 40515| 0.02857142857142857|
| 33412|                 0.0|
| 33602|             0.01875|
| 20596|  0.0062402496099844|
|119169|0.007861635220125786|
|116158|                 0.0|
| 46938| 0.02127659574468085|
|114166|                 0.0|
| 43921|                 0.0|
| 98184| 0.02702702702702703|
| 39433|                 0.0|
| 34713|                 0.0|
| 20467|0.011363636363636364|
| 15162|                 0.0|
| 18759|                 0.0|
| 45769|                 0.0|
| 38651|                 0.0|
|110871|0.009433962264150943|
| 37864|                 0.0|
+------+--------------------+
only showing top 20 rows



In [68]:

# подсчет ctr
def get_ctr_df(df):
    count_cond_lambda = lambda cond: F.sum(F.when(cond, 1).otherwise(0))
    
    # Вариант без SQL кода:
    ctr_df = df.groupBy('ad_id') \
               .agg(count_cond_lambda(F.col('event') == 'view').alias('views'), \
                    count_cond_lambda(F.col('event') == 'click').alias('clicks'), \
                   F.countDistinct(F.when(F.col('event') == 'view', col('date'))).alias('days_count'))\
               .select('ad_id', F.expr("clicks / views").alias('CTR'), 'days_count') # селектим только CTR, чтобы в итоговой выборке не дропать clicks и views
    ctr_df.cache()
    return ctr_df


# выбираем нужные поля из старого df и вычисляем необходимые
ndf = df.select(['ad_id', 
                     'target_audience_count',  
                     'has_video', 
                     (col('ad_cost_type') == 'CPM').cast('integer').alias('is_cpm'), 
                     (col('ad_cost_type') == 'CPC').cast('integer').alias('is_cpc'), 
                     'ad_cost']) \
            .join(get_ctr_df(df), 'ad_id', 'left')
    # делим df на тренирующую и тестовую выборки
ndf.show(20)
ndf.count()                  

+------+---------------------+---------+------+------+-------+--------------------+----------+
| ad_id|target_audience_count|has_video|is_cpm|is_cpc|ad_cost|                 CTR|days_count|
+------+---------------------+---------+------+------+-------+--------------------+----------+
| 45061|              1955269|        0|     1|     0|  200.6|0.007547169811320755|         2|
|121288|               232011|        0|     1|     0|  187.4|                 0.0|         2|
|102737|                 4410|        0|     0|     1|   60.7|                 0.0|         2|
|107564|                62711|        0|     1|     0|  217.3|0.011627906976744186|         2|
|  4922|              1183501|        0|     0|     1|   60.1|0.013651877133105802|         2|
| 10325|                20779|        0|     1|     0|  211.7|                 0.0|         2|
| 41458|                 6864|        0|     1|     0|  205.8|0.024752475247524754|         2|
| 45831|               132960|        1|     1|   

1000000

Многие преобразования можно было бы выполнить в рамках одной группировки с применением нескольких агрегирующих функций. Попробуйте уменьшить число join операции, применяя множественную агрегацию. Пример: from pyspark.sql.functions import min, max, col df.groupBy('taxi_id') \ .agg(min(col('trip_start_timestamp')), max(col('trip_start_timestamp'))) \ .show() Это поможет сразу за одну агрегацию наити все колонки. 

In [90]:
# выбираем нужные поля из старого df и вычисляем необходимые
ndf = df.groupBy('ad_id') \
        .agg(F.max('target_audience_count').alias('target_audience_count'), \
            count_cond_lambda(F.col('event') == 'view').alias('views'), \
             count_cond_lambda(F.col('event') == 'click').alias('clicks'), \
             F.countDistinct(F.when(F.col('event') == 'view', col('date'))).alias('days_count'))       

    
    # делим df на тренирующую и тестовую выборки
ndf.show(20)
ndf.count()


+------+---------------------+-----+------+----------+
| ad_id|target_audience_count|views|clicks|days_count|
+------+---------------------+-----+------+----------+
| 47217|                 7121|   22|     1|         2|
| 40515|                11533|  140|     4|         2|
| 33602|              3277386|  480|     9|         2|
| 33412|                 7195|   35|     0|         2|
| 20596|              1106999|  641|     4|         2|
| 46938|                23187|   47|     1|         2|
|119169|                35019|  636|     5|         2|
| 43921|                 7807|   22|     0|         1|
|116158|             46707392| 5589|     0|         2|
|114166|                 7350|   26|     0|         2|
| 20467|              3388754|   88|     1|         2|
| 98184|                 1231|   37|     1|         2|
| 39433|                 6759|   28|     0|         2|
| 15162|             32214433|   81|     0|         2|
| 34713|               139368|  103|     0|         2|
| 45769|  

965

In [135]:
nndf = df.select(['ad_id', 
                  'target_audience_count',  
                  'has_video', 
                  'event', 
                  'date',
                  (col('ad_cost_type') == 'CPM').cast('integer').alias('is_cpm'), 
                  (col('ad_cost_type') == 'CPC').cast('integer').alias('is_cpc'), 
                  'ad_cost']) \
        .groupBy('ad_id', 'target_audience_count', 'has_video', 'is_cpm', 'is_cpc', 'ad_cost') \
        .agg(count_cond_lambda(F.col('event') == 'view').alias('views'), \
             count_cond_lambda(F.col('event') == 'click').alias('clicks'), \
             F.countDistinct(F.when(F.col('event') == 'view', col('date'))).alias('day_count')) \
         .select(['*', F.expr("clicks / views").alias('CTR')]) \
         .drop('views', 'clicks')
nndf.show()
nndf.count()

+------+---------------------+---------+------+------+-------+----------+--------------------+
| ad_id|target_audience_count|has_video|is_cpm|is_cpc|ad_cost|days_count|                 CTR|
+------+---------------------+---------+------+------+-------+----------+--------------------+
| 32798|                 1377|        0|     1|     0|  199.2|         2|0.043478260869565216|
|120382|                  640|        0|     1|     0|  204.9|         2|                 0.0|
| 43506|                38220|        0|     0|     1|   52.2|         2|                 0.0|
| 37809|               772494|        0|     1|     0|  192.0|         2|0.013836477987421384|
| 17920|                11710|        0|     1|     0|  204.2|         2|0.014084507042253521|
| 13856|               217626|        0|     0|     1|   48.2|         2|0.012539184952978056|
|108836|               253481|        0|     1|     0|  202.4|         2|                 0.0|
| 45999|              1153387|        0|     1|   

965

In [137]:
def get_ctr_df(df):
    count_cond_lambda = lambda cond: F.sum(F.when(cond, 1).otherwise(0))
    
    # Вариант без SQL кода:
    ctr_df = df.groupBy('ad_id') \
               .agg(count_cond_lambda(F.col('event') == 'view').alias('views'), \
                    count_cond_lambda(F.col('event') == 'click').alias('clicks'), \
                    F.countDistinct(F.when(F.col('event') == 'view', col('date'))).alias('day_count')) \
               .select('ad_id', 'day_count', F.expr("clicks / views").alias('CTR')) # селектим только CTR, чтобы в итоговой выборке не дропать clicks и views
    return ctr_df
    
ndf = df.select(['ad_id', 
                     'target_audience_count',  
                     'has_video', 
                     (col('ad_cost_type') == 'CPM').cast('integer').alias('is_cpm'), 
                     (col('ad_cost_type') == 'CPC').cast('integer').alias('is_cpc'), 
                     'ad_cost']) \
        .join(get_ctr_df(df), 'ad_id', 'left')


In [138]:
count_cond_lambda = lambda cond: F.sum(F.when(cond, 1).otherwise(0))
    
ndf = df.select(['ad_id', 
                     'target_audience_count',  
                     'has_video', 
                     'event', 
                     'date',
                     (col('ad_cost_type') == 'CPM').cast('integer').alias('is_cpm'), 
                     (col('ad_cost_type') == 'CPC').cast('integer').alias('is_cpc'), 
                     'ad_cost']) \
            .groupBy('ad_id', 'target_audience_count', 'has_video', 'is_cpm', 'is_cpc', 'ad_cost') \
            .agg(count_cond_lambda(F.col('event') == 'view').alias('views'), \
                 count_cond_lambda(F.col('event') == 'click').alias('clicks'), \
                 F.countDistinct(F.when(F.col('event') == 'view', col('date'))).alias('day_count')) \
            .select([ 
                     'ad_id', 
                     'target_audience_count',  
                     'has_video', 
                     'is_cpm',
                     'is_cpc',
                     'ad_cost',
                     'day_count',
                     F.expr("clicks / views").alias('CTR')]) \


AnalysisException: cannot resolve '`event`' given input columns: [ad_cost, ad_id, clicks, day_count, has_video, is_cpc, is_cpm, target_audience_count, views];;
'Project [ad_id#4, target_audience_count#10, has_video#9, 'event, 'date, is_cpm#3501, is_cpc#3502, ad_cost#8, day_count#3523L, (cast(clicks#3522L as double) / cast(views#3520L as double)) AS CTR#3534]
+- Aggregate [ad_id#4, target_audience_count#10, has_video#9, is_cpm#3501, is_cpc#3502, ad_cost#8], [ad_id#4, target_audience_count#10, has_video#9, is_cpm#3501, is_cpc#3502, ad_cost#8, sum(cast(CASE WHEN (event#2 = view) THEN 1 ELSE 0 END as bigint)) AS views#3520L, sum(cast(CASE WHEN (event#2 = click) THEN 1 ELSE 0 END as bigint)) AS clicks#3522L, count(distinct CASE WHEN (event#2 = view) THEN date#0 END) AS day_count#3523L]
   +- Project [ad_id#4, target_audience_count#10, has_video#9, event#2, date#0, cast((ad_cost_type#7 = CPM) as int) AS is_cpm#3501, cast((ad_cost_type#7 = CPC) as int) AS is_cpc#3502, ad_cost#8]
      +- Relation[date#0,time#1,event#2,platform#3,ad_id#4,client_union_id#5,compaign_union_id#6,ad_cost_type#7,ad_cost#8,has_video#9,target_audience_count#10] parquet
