In [None]:
%%capture
!pip install pyspark

In [None]:
import pyspark.sql.functions as f

from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.utils import AnalysisException

from google.colab import drive
drive.mount('/content/gdrive')

Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).


In [None]:
spark = (
    SparkSession
    .builder
    .appName("app")
    .getOrCreate()
)

spark.sparkContext.setLogLevel("WARN")

#### загрузка 2х таблиц

In [None]:
logs = spark.read.csv(
  '/content/gdrive/MyDrive/Colab Notebooks/pyspark/ch5/files/BroadcastLogs_2018_Q3_M8_sample.CSV',
  sep="|",
  header=True,
  inferSchema=True,
  timestampFormat="yyyy-MM-dd",
)

print(logs.count())
display(logs.show(5))

238945
+--------------+------------+----------+----------+-------------------+----------------------+----------+---------------+-----------------+----------------+---------------+------------------+--------------+--------------------+------------+----------------+----------------+------------+------------+--------------------+----------------+--------+--------------------+------------------+----------------------+-------------+---------+---------+---------+---------+
|BroadcastLogID|LogServiceID|   LogDate|SequenceNO|AudienceTargetAgeID|AudienceTargetEthnicID|CategoryID|ClosedCaptionID|CountryOfOriginID|DubDramaCreditID|EthnicProgramID|ProductionSourceID|ProgramClassID|FilmClassificationID|ExhibitionID|        Duration|         EndTime|LogEntryDate|ProductionNO|        ProgramTitle|       StartTime|Subtitle|NetworkAffiliationID|SpecialAttentionID|BroadcastOriginPointID|CompositionID|Producer1|Producer2|Language1|Language2|
+--------------+------------+----------+----------+------------

None

#### повторяем шаги из 4 главы

In [None]:
logs = logs.drop("BroadcastLogID", "SequenceNO")

logs = logs.withColumn(
    'Duration_seconds',
    f.col('Duration').substr(1,2).cast('int')*60*60 +
    f.col('Duration').substr(4,2).cast('int')*60 +
    f.col('Duration').substr(7,2).cast('int')
)
logs.show(5)

+------------+----------+-------------------+----------------------+----------+---------------+-----------------+----------------+---------------+------------------+--------------+--------------------+------------+----------------+----------------+------------+------------+--------------------+----------------+--------+--------------------+------------------+----------------------+-------------+---------+---------+---------+---------+----------------+
|LogServiceID|   LogDate|AudienceTargetAgeID|AudienceTargetEthnicID|CategoryID|ClosedCaptionID|CountryOfOriginID|DubDramaCreditID|EthnicProgramID|ProductionSourceID|ProgramClassID|FilmClassificationID|ExhibitionID|        Duration|         EndTime|LogEntryDate|ProductionNO|        ProgramTitle|       StartTime|Subtitle|NetworkAffiliationID|SpecialAttentionID|BroadcastOriginPointID|CompositionID|Producer1|Producer2|Language1|Language2|Duration_seconds|
+------------+----------+-------------------+----------------------+----------+---------

In [None]:
log_identifier = spark.read.csv(
    '/content/gdrive/MyDrive/Colab Notebooks/pyspark/ch5/files/LogIdentifier.csv',
    sep="|",
    header=True,
    inferSchema=True,
    timestampFormat="yyyy-MM-dd",
)

print(log_identifier.printSchema())
print(log_identifier.count())
display(log_identifier.show())

root
 |-- LogIdentifierID: string (nullable = true)
 |-- LogServiceID: integer (nullable = true)
 |-- PrimaryFG: integer (nullable = true)

None
920
+---------------+------------+---------+
|LogIdentifierID|LogServiceID|PrimaryFG|
+---------------+------------+---------+
|           13ST|        3157|        1|
|         2000SM|        3466|        1|
|           70SM|        3883|        1|
|           80SM|        3590|        1|
|           90SM|        3470|        1|
|         9DAPTN|        3158|        1|
|         9DCFCF|        3159|        1|
|         9DCFRN|        3160|        1|
|         9DCHRO|        3161|        1|
|         9DCIVI|        3162|        1|
|         9DCJCH|        3163|        1|
|         9DCJOH|        3164|        1|
|         9DCKCK|        3165|        1|
|         9DCKCW|        3166|        1|
|         9DCKVR|        3167|        1|
|          9DCKY|        3168|        1|
|         9DLEAF|        3169|        1|
|          9DMC2|        3170| 

None

In [None]:
logs.select(f.col('LogServiceID')).distinct().count()

324

#### фильтрация данных  
берем только те данные, где PrimaryFG = 1

In [None]:
log_identifier = log_identifier.filter(f.col("PrimaryfG") == 1)
print(log_identifier.count())


758


Объединение таблицы logs и таблицы log_identifier.  
inner - в итоговый df войдут только те объекты, которые содержатся в обоих таблицах

In [None]:
logs.join(
    log_identifier,
    on="LogServiceID",
    how='inner'
).show()

+------------+----------+-------------------+----------------------+----------+---------------+-----------------+----------------+---------------+------------------+--------------+--------------------+------------+----------------+----------------+------------+------------+--------------------+----------------+--------+--------------------+------------------+----------------------+-------------+---------+---------+---------+---------+----------------+---------------+---------+
|LogServiceID|   LogDate|AudienceTargetAgeID|AudienceTargetEthnicID|CategoryID|ClosedCaptionID|CountryOfOriginID|DubDramaCreditID|EthnicProgramID|ProductionSourceID|ProgramClassID|FilmClassificationID|ExhibitionID|        Duration|         EndTime|LogEntryDate|ProductionNO|        ProgramTitle|       StartTime|Subtitle|NetworkAffiliationID|SpecialAttentionID|BroadcastOriginPointID|CompositionID|Producer1|Producer2|Language1|Language2|Duration_seconds|LogIdentifierID|PrimaryFG|
+------------+----------+-----------

#### Если явно задавать условие объединения, то колонка LogServiceID продублируется

In [None]:
logs_and_channels_verbose = logs.join(
    log_identifier,
    on = logs["LogServiceID"] == log_identifier["LogServiceID"]
)
logs_and_channels_verbose.printSchema()

root
 |-- LogServiceID: integer (nullable = true)
 |-- LogDate: date (nullable = true)
 |-- AudienceTargetAgeID: integer (nullable = true)
 |-- AudienceTargetEthnicID: integer (nullable = true)
 |-- CategoryID: integer (nullable = true)
 |-- ClosedCaptionID: integer (nullable = true)
 |-- CountryOfOriginID: integer (nullable = true)
 |-- DubDramaCreditID: integer (nullable = true)
 |-- EthnicProgramID: integer (nullable = true)
 |-- ProductionSourceID: integer (nullable = true)
 |-- ProgramClassID: integer (nullable = true)
 |-- FilmClassificationID: integer (nullable = true)
 |-- ExhibitionID: integer (nullable = true)
 |-- Duration: string (nullable = true)
 |-- EndTime: string (nullable = true)
 |-- LogEntryDate: date (nullable = true)
 |-- ProductionNO: string (nullable = true)
 |-- ProgramTitle: string (nullable = true)
 |-- StartTime: string (nullable = true)
 |-- Subtitle: string (nullable = true)
 |-- NetworkAffiliationID: integer (nullable = true)
 |-- SpecialAttentionID: inte

из-за этого не получается сделать select так как пайспарк не понимает, какую именно мы колонку хотим

In [None]:
try:
    logs_and_channels_verbose.select("LogServiceID")
except AnalysisException as err:
    print(err)

[AMBIGUOUS_REFERENCE] Reference `LogServiceID` is ambiguous, could be: [`LogServiceID`, `LogServiceID`].


варианты как не дублировать столбец  
не задавать явного условия, а просто писать имя клонки

In [None]:
logs_and_channels_verbose = logs.join(log_identifier, "LogServiceID")
logs_and_channels_verbose.select("LogServiceID")

DataFrame[LogServiceID: int]

удаляем одну из колонок, обращаясь к имени старой таблицы вариант более универсальный
pyspark запомнил происхождение обоих колонок LogServiceID, и когда мы хотим удалить колонку log_identifier["LogServiceID"], он он удаляет ту колонку, которая ранее была в log_identifier

In [None]:
logs_and_channels_verbose = logs.join(log_identifier, logs["LogServiceID"] == log_identifier["LogServiceID"])
(
    logs_and_channels_verbose.drop(log_identifier["LogServiceID"])
    .select("LogServiceID")
)

DataFrame[LogServiceID: int]

можно сделать тоже самое через f.col, но требуется присвоить таблицам псевдоним

In [None]:
logs_and_channels_verbose = logs.alias("left").join(
    log_identifier.alias("right"),
    logs["LogServiceID"] == log_identifier["LogServiceID"]
)
logs_and_channels_verbose.drop(f.col("right.LogServiceID")).select("LogServiceID")

DataFrame[LogServiceID: int]

читаем файл CD_Category, берем оттуда нужные колонки

In [None]:
cd_category = spark.read.csv(
    "/content/gdrive/MyDrive/Colab Notebooks/pyspark/ch5/files/CD_Category.csv",
    sep="|",
    header=True,
    inferSchema=True,
)

cd_category = cd_category.select(
    "CategoryID",
    "CategoryCD",
    f.col("EnglishDescription").alias("Category_Description")
)

cd_category.show(5)

+----------+----------+--------------------+
|CategoryID|CategoryCD|Category_Description|
+----------+----------+--------------------+
|         1|       010|                NEWS|
|         2|       02 |CANREC  ANALYSIS ...|
|         3|       02A|ANALYSIS AND INTE...|
|         4|       02B|LONG-FORM DOCUMEN...|
|         5|       030|REPORTING & ACTUA...|
+----------+----------+--------------------+
only showing top 5 rows



читаем файл CD_ProgramClass.csv, берем оттуда нужные колонки

In [None]:
cd_program_class = spark.read.csv(
    "/content/gdrive/MyDrive/Colab Notebooks/pyspark/ch5/files/CD_ProgramClass.csv",
    sep="|",
    header=True,
    inferSchema=True,
)

cd_program_class = cd_program_class.select(
    "ProgramClassID",
    "ProgramClassCD",
    f.col("EnglishDescription").alias("ProgramClass_Description")
)

cd_program_class.show(5)

+--------------+--------------+------------------------+
|ProgramClassID|ProgramClassCD|ProgramClass_Description|
+--------------+--------------+------------------------+
|             1|          AUT |           AUTOPROMOTION|
|             2|          BAL |     BALANCE PROGRAMMING|
|             3|          COM |      COMMERCIAL MESSAGE|
|             4|          COR |             CORNERSTONE|
|             5|          DOC |             DOCUMENTARY|
+--------------+--------------+------------------------+
only showing top 5 rows



Соединяем все таблицы в одну

In [None]:
full_log = (
    logs_and_channels_verbose
    .join(cd_category, "CategoryID", how="left")
    .join(cd_program_class, "ProgramClassID", how="left")
)

full_log.show(5)

+--------------+----------+------------+----------+-------------------+----------------------+---------------+-----------------+----------------+---------------+------------------+--------------------+------------+----------------+----------------+------------+------------+--------------------+----------------+--------+--------------------+------------------+----------------------+-------------+---------+---------+---------+---------+----------------+---------------+------------+---------+----------+--------------------+--------------+------------------------+
|ProgramClassID|CategoryID|LogServiceID|   LogDate|AudienceTargetAgeID|AudienceTargetEthnicID|ClosedCaptionID|CountryOfOriginID|DubDramaCreditID|EthnicProgramID|ProductionSourceID|FilmClassificationID|ExhibitionID|        Duration|         EndTime|LogEntryDate|ProductionNO|        ProgramTitle|       StartTime|Subtitle|NetworkAffiliationID|SpecialAttentionID|BroadcastOriginPointID|CompositionID|Producer1|Producer2|Language1|Langu

#### пример join на простом датасете

In [None]:
data = [
    {"id": 1, "name": "John","name1": "J3"},
    {"id": 2, "name": "dima","name1": "q3"},
    {"id": 3, "name": "saha","name1": "w3"},
    {"id": 4, "name": "den","name1": "r3"},
    {"id": 6, "name": "den","name1": "r3"}
]
df_1 = spark.createDataFrame(data)
df_1.show()

data = [
    {"id": 1, "n": "John11","n1": "gJ3"},
    {"id": 2, "n": "dima11","n1": "gq3"},
    {"id": 3, "n": "saha11","n1": "gw3"},
    {"id": 4, "n": "den11","n1": "gr3"},
    {"id": 4, "n": "111","n1": "111"},
    {"id": 5, "n": "den11","n1": "gr3"}
]
df_2 = spark.createDataFrame(data)
df_2.show()

+---+----+-----+
| id|name|name1|
+---+----+-----+
|  1|John|   J3|
|  2|dima|   q3|
|  3|saha|   w3|
|  4| den|   r3|
|  6| den|   r3|
+---+----+-----+

+---+------+---+
| id|     n| n1|
+---+------+---+
|  1|John11|gJ3|
|  2|dima11|gq3|
|  3|saha11|gw3|
|  4| den11|gr3|
|  4|   111|111|
|  5| den11|gr3|
+---+------+---+



In [None]:
print('left join:')
df_1.join(df_2, how="left", on="id").show()

print('left_semi join:')
print('выводит только те объекты, которые есть и в левом df и в правом, но столбцы только левого df')
df_1.join(df_2, how="left_semi", on="id").show()

print('left_anti join:')
print('выводит только те объекты, которые есть в левом df, но нет в правом')
df_1.join(df_2, how="left_anti", on="id").show()

left join:
+---+----+-----+------+----+
| id|name|name1|     n|  n1|
+---+----+-----+------+----+
|  1|John|   J3|John11| gJ3|
|  2|dima|   q3|dima11| gq3|
|  6| den|   r3|  NULL|NULL|
|  3|saha|   w3|saha11| gw3|
|  4| den|   r3|   111| 111|
|  4| den|   r3| den11| gr3|
+---+----+-----+------+----+

left_semi join:
выводит только те объекты, которые есть и в левом df и в правом, но столбцы только левого df
+---+----+-----+
| id|name|name1|
+---+----+-----+
|  1|John|   J3|
|  2|dima|   q3|
|  3|saha|   w3|
|  4| den|   r3|
+---+----+-----+

left_anti join:
выводит только те объекты, которые есть в левом df, но нет в правом
+---+----+-----+
| id|name|name1|
+---+----+-----+
|  6| den|   r3|
+---+----+-----+



#### Продвинутая группировка данных  
наиболее правильный способ делать через f.func, так как это позволит переименовать столбец  
сгруппировали df по двум столбцам, суммировали duration_seconds, переименовав в duration_total, потом отсортировав

In [None]:
(
    full_log
    .groupby("ProgramClassCD", "ProgramClass_Description")
    .agg(
        f.sum("duration_seconds").alias("duration_total"),
        f.first("duration_seconds").alias("first")
    )
    .orderBy("duration_total", ascending=False).show(1000,False)
 )

+--------------+--------------------------------------+--------------+-----+
|ProgramClassCD|ProgramClass_Description              |duration_total|first|
+--------------+--------------------------------------+--------------+-----+
|PGR           |PROGRAM                               |20992510      |7200 |
|COM           |COMMERCIAL MESSAGE                    |3519163       |15   |
|PFS           |PROGRAM FIRST SEGMENT                 |1344762       |659  |
|SEG           |SEGMENT OF A PROGRAM                  |1205998       |2257 |
|PRC           |PROMOTION OF UPCOMING CANADIAN PROGRAM|880600        |30   |
|PGI           |PROGRAM INFOMERCIAL                   |679182        |1800 |
|PRO           |PROMOTION OF NON-CANADIAN PROGRAM     |335701        |30   |
|OFF           |SCHEDULED OFF AIR TIME PERIOD         |142279        |2690 |
|ID            |NETWORK IDENTIFICATION MESSAGE        |74926         |5    |
|NRN           |No recognized nationality             |59686         |30   |

#### when  
when похожий метод на фильтр, но фильтр удаляет то, что не подходит, а when заменяет чем-либо. По умолчанию заменяет на пропуски, это можно регулировать методом otherwise  
ф-ия trim удаляет пробелы с краев строки. Если ее не использовать, то isin не сработает


In [None]:
#when
full_log.select(
    f.when(
        f.trim(f.col('ProgramClassCD')).isin(["COM"]),
        f.col("ProgramClassCD")
    )
    .otherwise(0)
    .alias('like_filter')
).show(5)
#filter
full_log.filter(
    f.trim(f.col('ProgramClassCD')).isin(["COM"])
).select(f.col('ProgramClassCD')).show(5)



+-----------+
|like_filter|
+-----------+
|          0|
|          0|
|       COM |
|       COM |
|       COM |
+-----------+
only showing top 5 rows

+--------------+
|ProgramClassCD|
+--------------+
|          COM |
|          COM |
|          COM |
|          COM |
|          COM |
+--------------+
only showing top 5 rows



группируем данные по LogIdentifierID, обнуляем duration_seconds там, где ProgramClassCD не соответствует рекламе, аггрегируем суммой и получаем столбец duration_commercial для каждого элемента LogIdentifierID, назваем столбец duration_seconds. Также аггрегируем суммой обычный столбец duration_seconds и получаем duration_total.  
  
методом withColumn создаем commercial_ratio путем деления duration_commercial на duration_total  

сортируем по commercial_ratio

доп задание:  
правительство канады попросило считать prc как 0.75 коммерческой секунды, поэтому после when добавляем еще when, где прописываем эту логику  
Так же вывод commercial_ratio был округлен до 0.1 функцией round, которая принимает значение и количество знаков после запятой

In [None]:
answer = full_log.groupby("LogIdentifierID").agg(
    f.sum(
        f.when(
            f.trim(f.col("ProgramClassCD")).isin(
                ["COM", "PGI", "PRO", "LOC", "SPO", "MER", "SOL"]
            ),
            f.col("duration_seconds"),
        ).when(
            f.trim(f.col('ProgramClassCD')) == 'PRC',
            f.col('duration_seconds') * 0.75
        ).otherwise(0)
    ).alias("duration_commercial"),
    f.sum("duration_seconds").alias("duration_total"),
)
answer.show()

answer = answer.withColumn(
    "commercial_ratio",
    f.round(f.col('duration_commercial')/f.col('duration_total'),1)
)

answer.orderBy(f.col('commercial_ratio'), ascending=False).show()

+---------------+-------------------+--------------+
|LogIdentifierID|duration_commercial|duration_total|
+---------------+-------------------+--------------+
|           CJCO|           16657.25|        106174|
|          BRAVO|            20747.5|        108920|
|           CFTF|                0.0|          1805|
|           CKCS|           10225.75|         97158|
|           CJNT|            25736.5|        112904|
|           CKES|            10122.0|         97019|
|           CHBX|            29727.5|        108007|
|         BBCKID|             3431.0|         92104|
|           BOOK|           18746.25|        105885|
|           CHAN|            24443.5|        107938|
|         CEVASI|            18345.0|        107180|
|            CMT|            21571.5|        108942|
|           CIII|           26190.75|        107812|
|           CFMT|           25597.75|        102688|
|           CFGS|            29872.5|         91500|
|           CBKT|           16128.75|        1

#### Работа с пропусками  
можно их просто удалить, используя drop. Аргумент subset

In [None]:
answer_no_null = answer.dropna(subset=["commercial_ratio"])
answer_no_null.orderBy(f.col('commercial_ratio'), ascending=True).show()

+---------------+-------------------+--------------+----------------+
|LogIdentifierID|duration_commercial|duration_total|commercial_ratio|
+---------------+-------------------+--------------+----------------+
|         MOVIEP|             2798.0|         86400|             0.0|
|           THTV|            4186.25|         92253|             0.0|
|          SLVSC|             3060.0|         89722|             0.0|
|         BBCKID|             3431.0|         92104|             0.0|
|          ATN13|                0.0|         86400|             0.0|
|           CICA|            2990.25|         90140|             0.0|
|           OTN1|              240.0|         86730|             0.0|
|           ATN8|             1623.0|         88153|             0.0|
|         SCSD03|             3577.5|         86400|             0.0|
|           OTN3|                0.0|         86400|             0.0|
|            TCC|             4281.0|         91081|             0.0|
|           CFTF|   

замена нулем

In [None]:
answer_no_null = answer.fillna(0)
answer_no_null.orderBy(f.col('commercial_ratio'), ascending=True).show()

# замена пропусков отдельно для каждого столбца
answer_no_null = answer.fillna(
 {"duration_commercial": 0, "duration_total": 0, "commercial_ratio": 0}
)

+---------------+-------------------+--------------+----------------+
|LogIdentifierID|duration_commercial|duration_total|commercial_ratio|
+---------------+-------------------+--------------+----------------+
|         MOVIEP|             2798.0|         86400|             0.0|
|           TMN1|             2393.0|         86400|             0.0|
|          SLVSC|             3060.0|         89722|             0.0|
|         BBCKID|             3431.0|         92104|             0.0|
|          ATN13|                0.0|         86400|             0.0|
|           CICA|            2990.25|         90140|             0.0|
|           OTN1|              240.0|         86730|             0.0|
|           ATN8|             1623.0|         88153|             0.0|
|         SCSD03|             3577.5|         86400|             0.0|
|           OTN3|                0.0|         86400|             0.0|
|            TCC|             4281.0|         91081|             0.0|
|           CFTF|   