In [1]:
%%bash
apt-get install openjdk-17-jdk-headless -qq > /dev/null

In [2]:
%%bash
wget -q https://archive.apache.org/dist/spark/spark-3.3.0/spark-3.3.0-bin-hadoop3.tgz
tar xf spark-3.3.0-bin-hadoop3.tgz

In [3]:
import os
os.environ['JAVA_HOME']='/usr/lib/jvm/java-1.17.0-openjdk-amd64'
os.environ['SPARK_HOME']='/content/spark-3.3.0-bin-hadoop3'

In [4]:
%%bash
pip install findspark



In [5]:
import findspark
findspark.init()

In [6]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression,DecisionTreeClassifier,RandomForestClassifier

In [7]:
from google.colab import drive
drive.mount('/content/gdrive')

Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).


#### Программа А

In [None]:
spark = SparkSession.builder\
    .master('local')\
    .appName("My Spark Session").getOrCreate()
df = spark.read.csv(r"/content/gdrive/MyDrive/brooklyn_sales_map.csv",header=True)
df.show()

+---+--------+--------------------+-----------------------+---------+-----+----+--------+--------------+--------------------+----------------+--------+-----------------+----------------+-----------+---------+----------+----------+-----------------+----------------------+----------+----------+------------+---------+---+------+------+----------+-------+-------+--------+----------+----------+----------+---------+----------+--------+-------------------+---------+---------+---------+---------+--------+--------+-------+-------+-------+---------+---------+---------+-------+---------+---------+--------------------+-------+--------+-------+-------+----------+----------+----------+---------+----------+---------+----------+--------+---------+--------+----------+--------+--------+---------+---------+---+--------+----------+-------+--------+----------+---------+----------+---------+---------+----------+----------+--------------------+--------+--------+--------+-------+--------+--------+----------+-

##### Задание 1

In [None]:
df.printSchema()

In [None]:
df.select('year_built').show()

+----------+
|year_built|
+----------+
|      2002|
|         0|
|      1924|
|      1970|
|      1927|
|         0|
|      1928|
|      2012|
|         0|
|      1912|
|         0|
|         0|
|      2009|
|      1967|
|      1920|
|      1992|
|      1920|
|      2014|
|         0|
|      1962|
+----------+
only showing top 20 rows



In [None]:
year_built_mean = df.select('year_built').groupBy().agg(F.mean('year_built').alias('mean'))
year_built_mean_val = year_built_mean.collect()[0]['mean']
print("Средний год постройки жилья:", year_built_mean_val)

Средний год постройки жилья: 1701.6663067976863


In [None]:
df_year_buit_and_deviation = df.select('year_built')
df_year_buit_and_deviation = df_year_buit_and_deviation.withColumn('deviation, %', (df_year_buit_and_deviation.year_built - year_built_mean_val) / df_year_buit_and_deviation.year_built * 100)
df_year_buit_and_deviation.show()

+----------+------------------+
|year_built|      deviation, %|
+----------+------------------+
|      2002|15.001682977138547|
|         0|              null|
|      1924|11.555805259995516|
|      1970|13.620999654939784|
|      1927|11.693497312003826|
|         0|              null|
|      1928|11.739299439954031|
|      2012|15.424139821188554|
|         0|              null|
|      1912|11.000716171669128|
|         0|              null|
|         0|              null|
|      2009|15.297844360493466|
|      1967|13.489257407336744|
|      1920| 11.37154652095384|
|      1992|14.574984598509724|
|      1920| 11.37154652095384|
|      2014|15.508127765755397|
|         0|              null|
|      1962|13.268791702462474|
+----------+------------------+
only showing top 20 rows



##### Задание 2

Мы можем использовать методы groupBy() и agg() для подсчета количества различных домов для каждой улицы.

Этот код создает DataFrame с данными, группирует данные по столбцу "address9" и использует функцию countDistinct() для подсчета количества уникальных значений в столбце "building_class_category".

Метод F.countDistinct() является функцией в модуле pyspark.sql.functions Apache Spark. Он используется для подсчета количества уникальных значений в столбце DataFrame.

In [None]:
df_street_building = df.groupBy('address9').agg(F.countDistinct('building_class_category').alias("Number of unique houses on the street"))
df_street_building.show()

+--------------------+-------------------------------------+
|            address9|Number of unique houses on the street|
+--------------------+-------------------------------------+
|119 NORTH 11TH ST...|                                    0|
|6609 FORT HAMILTO...|                                    1|
|        784 4 AVENUE|                                    1|
|   8704-08 18 AVENUE|                                    3|
|   228 QUINCY STREET|                                    1|
|285 PROSPECT PLAC...|                                    1|
|1626-1628 UTICA A...|                                    1|
|2700 VOORHIES AVENUE|                                    1|
|999 EAST 108TH ST...|                                    1|
|60 PINEAPPLE STRE...|                                    1|
|     547 49TH STREET|                                    1|
|    6614 14TH AVENUE|                                    1|
|   151 11TH   STREET|                                    1|
|400 EAST 17TH STR...|  

##### Задание 3

Для сортировки датасета по нескольким столбцам одновременно в PySpark, вы можете использовать метод orderBy().

In [None]:
# Сортировка датасета по возрастанию цены продажи и убыванию индексов
sorted_df = df.orderBy('sale_price', df['zip_code'].desc())
# Выводим результат
sorted_df.select('sale_price','zip_code').show()

+----------+--------+
|sale_price|zip_code|
+----------+--------+
|         0|   33803|
|         0|   33803|
|         0|   11416|
|         0|   11416|
|         0|   11416|
|         0|   11416|
|         0|   11416|
|         0|   11416|
|         0|   11416|
|         0|   11416|
|         0|   11416|
|         0|   11416|
|         0|   11249|
|         0|   11249|
|         0|   11249|
|         0|   11249|
|         0|   11249|
|         0|   11249|
|         0|   11249|
|         0|   11249|
+----------+--------+
only showing top 20 rows



##### Задание 4

In [None]:
# Группируем данные по соседству и категории класса здания и считаем максимальную цену продажи и количество зданий
sorted_df = df.groupBy('neighborhood', 'building_class_category')\
    .agg(F.max('sale_price').alias('max_sale_price'), F.count('*').alias('building_count'))\
    .orderBy('neighborhood', 'building_class_category')

В приведенном примере создается DataFrame с данными, а затем с помощью методов groupBy(), agg() и max() группируются данные по столбцам "neighborhood" и "building_class_category". Затем применяется агрегатная функция max() для подсчета наибольшей цены продажи и используется count() для подсчета количества зданий. Результат сохраняется в DataFrame sorted_df и выводится на экран с помощью метода show().

In [None]:
sorted_df.select('neighborhood','building_count','max_sale_price').show()

+------------+--------------+--------------+
|neighborhood|building_count|max_sale_price|
+------------+--------------+--------------+
|        3004|             2|        346788|
|        3019|             1|             0|
|  BATH BEACH|             6|             0|
|  BATH BEACH|           509|         9e+05|
|  BATH BEACH|          1808|         9e+05|
|  BATH BEACH|           756|         9e+05|
|  BATH BEACH|           450|        753505|
|  BATH BEACH|            20|         7e+05|
|  BATH BEACH|            28|        960000|
|  BATH BEACH|             3|             0|
|  BATH BEACH|           207|        999988|
|  BATH BEACH|            11|        850000|
|  BATH BEACH|            19|        855421|
|  BATH BEACH|           406|         94000|
|  BATH BEACH|            24|        435000|
|  BATH BEACH|           102|         98000|
|  BATH BEACH|             4|        399000|
|  BATH BEACH|            69|        550000|
|  BATH BEACH|             6|        780000|
|  BATH BE

In [None]:
# Закрытие сессии
spark.stop()

#### Программа B

In [37]:
spark = SparkSession.builder\
    .master('local')\
    .appName("My Spark Session").getOrCreate()
df = spark.read.csv(r"/content/gdrive/MyDrive/smoke_detection_iot.csv",header=True)
df.show()

+---+----------+--------------+-----------+---------+---------+------+-----------+-------------+-----+-----+-----+-----+-----+---+----------+
|_c0|       UTC|Temperature[C]|Humidity[%]|TVOC[ppb]|eCO2[ppm]|Raw H2|Raw Ethanol|Pressure[hPa]|PM1.0|PM2.5|NC0.5|NC1.0|NC2.5|CNT|Fire Alarm|
+---+----------+--------------+-----------+---------+---------+------+-----------+-------------+-----+-----+-----+-----+-----+---+----------+
|  0|1654733331|          20.0|      57.36|        0|      400| 12306|      18520|      939.735|  0.0|  0.0|  0.0|  0.0|  0.0|  0|         0|
|  1|1654733332|        20.015|      56.67|        0|      400| 12345|      18651|      939.744|  0.0|  0.0|  0.0|  0.0|  0.0|  1|         0|
|  2|1654733333|        20.029|      55.96|        0|      400| 12374|      18764|      939.738|  0.0|  0.0|  0.0|  0.0|  0.0|  2|         0|
|  3|1654733334|        20.044|      55.28|        0|      400| 12390|      18849|      939.736|  0.0|  0.0|  0.0|  0.0|  0.0|  3|         0|
|  4|1

In [38]:
df.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- UTC: string (nullable = true)
 |-- Temperature[C]: string (nullable = true)
 |-- Humidity[%]: string (nullable = true)
 |-- TVOC[ppb]: string (nullable = true)
 |-- eCO2[ppm]: string (nullable = true)
 |-- Raw H2: string (nullable = true)
 |-- Raw Ethanol: string (nullable = true)
 |-- Pressure[hPa]: string (nullable = true)
 |-- PM1.0: string (nullable = true)
 |-- PM2.5: string (nullable = true)
 |-- NC0.5: string (nullable = true)
 |-- NC1.0: string (nullable = true)
 |-- NC2.5: string (nullable = true)
 |-- CNT: string (nullable = true)
 |-- Fire Alarm: string (nullable = true)



In [39]:
new_column_names = ['_c0','UTC','Temperature[C]','Humidity[%]','TVOC[ppb]','eCO2[ppm]','Raw H2','Raw Ethanol','Pressure[hPa]','PM10','PM25','NC05','NC10','NC25','CNT','Fire Alarm']
df = df.toDF(*new_column_names)
df.columns

['_c0',
 'UTC',
 'Temperature[C]',
 'Humidity[%]',
 'TVOC[ppb]',
 'eCO2[ppm]',
 'Raw H2',
 'Raw Ethanol',
 'Pressure[hPa]',
 'PM10',
 'PM25',
 'NC05',
 'NC10',
 'NC25',
 'CNT',
 'Fire Alarm']

In [40]:
feature_columns = ['_c0','UTC','Temperature[C]','Humidity[%]','TVOC[ppb]','eCO2[ppm]','Raw H2','Raw Ethanol','Pressure[hPa]','PM10','PM25','NC05','NC10','NC25','CNT']
target_column = ['Fire Alarm']

all_columns = feature_columns + target_column

In [41]:
string_col_2_float = ['PM10', 'NC10', 'PM25', 'NC05', 'NC25', 'Humidity[%]', 'Pressure[hPa]', 'Temperature[C]']

In [42]:
# Из string в numeric
for column in all_columns:
  if column in string_col_2_float:
    df = df.withColumn(column, F.col(column).cast('Double'))
  else:
    df = df.withColumn(column, F.col(column).cast('Integer'))

In [43]:
def df_2_vector(dataframe, inputCols, outputCol):
  assembler = VectorAssembler(
  inputCols=inputCols, outputCol=outputCol)
  assembled_df = assembler.transform(dataframe)
  return assembled_df.randomSplit([0.8, 0.2])

In [44]:
cols = feature_columns
train_df, test_df = df_2_vector(df, cols, 'vectorized_data')

In [45]:
cell_value = train_df.collect()[2]['vectorized_data']
print(cell_value)

[3.0,1654733334.0,20.044,55.28,0.0,400.0,12390.0,18849.0,939.736,0.0,0.0,0.0,0.0,0.0,3.0]


##### Логистическая регрессия

In [46]:
logistic_regression_model = LogisticRegression(labelCol= 'Fire Alarm', featuresCol = 'vectorized_data', maxIter=10000, regParam=0.1, elasticNetParam=0.8)
model_lr = logistic_regression_model.fit(train_df)

In [47]:
predictions_lr = model_lr.transform(test_df)

In [48]:
type(predictions_lr)

pyspark.sql.dataframe.DataFrame

In [49]:
predictions_lr.select('Fire Alarm','probability','prediction').show(truncate=False)

+----------+----------------------------------------+----------+
|Fire Alarm|probability                             |prediction|
+----------+----------------------------------------+----------+
|0         |[0.4942754172404245,0.5057245827595755] |1.0       |
|0         |[0.5019699171808899,0.4980300828191101] |0.0       |
|0         |[0.502772051440455,0.497227948559545]   |0.0       |
|0         |[0.5031633642885361,0.49683663571146386]|0.0       |
|0         |[0.5043386210021763,0.4956613789978237] |0.0       |
|0         |[0.504926232192021,0.49507376780797896] |0.0       |
|0         |[0.5055360131037703,0.4944639868962297] |0.0       |
|0         |[0.5058116527273412,0.4941883472726588] |0.0       |
|0         |[0.5024062599599176,0.4975937400400824] |0.0       |
|0         |[0.5013156659382825,0.4986843340617175] |0.0       |
|0         |[0.49857032152522174,0.5014296784747783]|1.0       |
|0         |[0.4970911011111644,0.5029088988888356] |1.0       |
|0         |[0.4948542202

In [50]:
print('Модель Логистической регрессии. Правильные предсказания')
TP = predictions_lr[(predictions_lr['Fire Alarm']==1)&(predictions_lr['prediction']==1)].count()
print('Количество верно идентифицированных угроз пожарной тревоги (правильное предсказание):',TP)
TN = predictions_lr[(predictions_lr['Fire Alarm']==0)&(predictions_lr['prediction']==0)].count()
print('Количество верно идентифицированных не угроз пожарной тревоги (правильное предсказание):',TN)

Модель Логистической регрессии. Правильные предсказания
Количество верно идентифицированных угроз пожарной тревоги (правильное предсказание): 8715
Количество верно идентифицированных не угроз пожарной тревоги (правильное предсказание): 2742


In [51]:
print('Модель Логистической регрессии. Ошибки')
FP = predictions_lr[(predictions_lr['Fire Alarm']==0)&(predictions_lr['prediction']==1)].count()
print('Количество ложно идентифицированных не угроз пожарной тревоги (ошибочное предсказание):',FP)
FN = predictions_lr[(predictions_lr['Fire Alarm']==1)&(predictions_lr['prediction']==0)].count()
print('Количество ложно идентифицированных угроз пожарной тревоги (ошибочное предсказание):',FN)

Модель Логистической регрессии. Ошибки
Количество ложно идентифицированных не угроз пожарной тревоги (ошибочное предсказание): 829
Количество ложно идентифицированных угроз пожарной тревоги (ошибочное предсказание): 239


In [52]:
#Метрики качества моделей
#Accuracy - Верность  Acc. = (TP+TN)/(TP+TN+FP+FN)
acc_lr = (TP+TN)/(TP+TN+FP+FN)
print('Верность модели (логистическая регрессия):', round(acc_lr,2))

#Precision - Точность Prec. = TP/(TP+FP)
precision_lr = TP/(TP+FP)
print('Точность модели (логистическая регрессия):', round(precision_lr,2))

#Recall - Полнота Recall = TP/(TP+FN)
recall_lr = TP/(TP+FN)
print('Полнота модели (логистическая регрессия):', round(recall_lr,2))

Верность модели (логистическая регрессия): 0.91
Точность модели (логистическая регрессия): 0.91
Полнота модели (логистическая регрессия): 0.97


##### Дерево решений

In [53]:
#Decision Tree Model
tree_model = DecisionTreeClassifier(featuresCol='vectorized_data',
                                  labelCol='Fire Alarm',maxDepth=3).fit(train_df)

tree_predictions = tree_model.transform(test_df)

In [54]:
tree_predictions.select('Fire Alarm','probability','prediction').show(truncate=False)

+----------+----------------------------------------+----------+
|Fire Alarm|probability                             |prediction|
+----------+----------------------------------------+----------+
|0         |[0.9349548842683405,0.06504511573165947]|0.0       |
|0         |[0.9349548842683405,0.06504511573165947]|0.0       |
|0         |[0.9349548842683405,0.06504511573165947]|0.0       |
|0         |[0.9349548842683405,0.06504511573165947]|0.0       |
|0         |[0.9349548842683405,0.06504511573165947]|0.0       |
|0         |[0.9349548842683405,0.06504511573165947]|0.0       |
|0         |[0.9349548842683405,0.06504511573165947]|0.0       |
|0         |[0.9349548842683405,0.06504511573165947]|0.0       |
|0         |[0.9349548842683405,0.06504511573165947]|0.0       |
|0         |[0.9349548842683405,0.06504511573165947]|0.0       |
|0         |[0.9349548842683405,0.06504511573165947]|0.0       |
|0         |[0.9349548842683405,0.06504511573165947]|0.0       |
|0         |[0.9349548842

In [55]:
print('Модель Дерева решений. Правильные предсказания')
TP = tree_predictions[(tree_predictions['Fire Alarm']==1)&(tree_predictions['prediction']==1)].count()
print('Количество верно идентифицированных угроз пожарной тревоги (правильное предсказание):',TP)
TN = tree_predictions[(tree_predictions['Fire Alarm']==0)&(tree_predictions['prediction']==0)].count()
print('Количество верно идентифицированных не угроз пожарной тревоги (правильное предсказание):',TN)

Модель Дерева решений. Правильные предсказания
Количество верно идентифицированных угроз пожарной тревоги (правильное предсказание): 8716
Количество верно идентифицированных не угроз пожарной тревоги (правильное предсказание): 3486


In [56]:
print('Модель Дерева решений. Ошибки')
FP = tree_predictions[(tree_predictions['Fire Alarm']==0)&(tree_predictions['prediction']==1)].count()
print('Количество ложно идентифицированных не угроз пожарной тревоги (ошибочное предсказание):',FP)
FN = tree_predictions[(tree_predictions['Fire Alarm']==1)&(tree_predictions['prediction']==0)].count()
print('Количество ложно идентифицированных угроз пожарной тревоги (ошибочное предсказание):',FN)

Модель Дерева решений. Ошибки
Количество ложно идентифицированных не угроз пожарной тревоги (ошибочное предсказание): 85
Количество ложно идентифицированных угроз пожарной тревоги (ошибочное предсказание): 238


In [57]:
#Метрики качества моделей
#Accuracy - Верность  Acc. = (TP+TN)/(TP+TN+FP+FN)
acc_tree = (TP+TN)/(TP+TN+FP+FN)
print('Верность модели (дерево решений):', round(acc_tree,2))

#Precision - Точность Prec. = TP/(TP+FP)
precision_tree = TP/(TP+FP)
print('Точность модели (дерево решений):', round(precision_tree,2))

#Recall - Полнота Recall = TP/(TP+FN)
recall_tree = TP/(TP+FN)
print('Полнота модели (дерево решений):', round(recall_tree,2))

Верность модели (дерево решений): 0.97
Точность модели (дерево решений): 0.99
Полнота модели (дерево решений): 0.97


##### Случайный лес

In [58]:
rf_model = RandomForestClassifier(featuresCol='vectorized_data',
                                  labelCol='Fire Alarm',maxDepth=5, numTrees=5).fit(train_df)

rf_predictions = rf_model.transform(test_df)
rf_predictions.select('Fire Alarm','probability','prediction').show(truncate=False)

+----------+-----------------------------------------+----------+
|Fire Alarm|probability                              |prediction|
+----------+-----------------------------------------+----------+
|0         |[0.9998393086184751,1.606913815248496E-4]|0.0       |
|0         |[0.9929427568943373,0.00705724310566278] |0.0       |
|0         |[0.9929427568943373,0.00705724310566278] |0.0       |
|0         |[0.9929427568943373,0.00705724310566278] |0.0       |
|0         |[0.9929427568943373,0.00705724310566278] |0.0       |
|0         |[0.9929427568943373,0.00705724310566278] |0.0       |
|0         |[0.9998393086184751,1.606913815248496E-4]|0.0       |
|0         |[0.9998393086184751,1.606913815248496E-4]|0.0       |
|0         |[0.9998393086184751,1.606913815248496E-4]|0.0       |
|0         |[0.9998393086184751,1.606913815248496E-4]|0.0       |
|0         |[0.9998393086184751,1.606913815248496E-4]|0.0       |
|0         |[0.9998393086184751,1.606913815248496E-4]|0.0       |
|0        

In [59]:
print('Модель Случайного леса. Правильные предсказания')
TP = rf_predictions[(rf_predictions['Fire Alarm']==1)&(rf_predictions['prediction']==1)].count()
print('Количество верно идентифицированных угроз пожарной тревоги (правильное предсказание):',TP)
TN = rf_predictions[(rf_predictions['Fire Alarm']==0)&(rf_predictions['prediction']==0)].count()
print('Количество верно идентифицированных не угроз пожарной тревоги (правильное предсказание):',TN)

Модель Случайного леса. Правильные предсказания
Количество верно идентифицированных угроз пожарной тревоги (правильное предсказание): 8954
Количество верно идентифицированных не угроз пожарной тревоги (правильное предсказание): 3566


In [60]:
print('Модель Случайного леса. Ошибки')
FP = rf_predictions[(rf_predictions['Fire Alarm']==0)&(rf_predictions['prediction']==1)].count()
print('Количество ложно идентифицированных не угроз пожарной тревоги (ошибочное предсказание):',FP)
FN = rf_predictions[(rf_predictions['Fire Alarm']==1)&(rf_predictions['prediction']==0)].count()
print('Количество ложно идентифицированных угроз пожарной тревоги (ошибочное предсказание):',FN)

Модель Случайного леса. Ошибки
Количество ложно идентифицированных не угроз пожарной тревоги (ошибочное предсказание): 5
Количество ложно идентифицированных угроз пожарной тревоги (ошибочное предсказание): 0


In [63]:
#Метрики качества моделей
#Accuracy - Верность  Acc. = (TP+TN)/(TP+TN+FP+FN)
acc_rf = (TP+TN)/(TP+TN+FP+FN)
print('Верность модели (случайный лес):', round(acc_rf,4))

#Precision - Точность Prec. = TP/(TP+FP)
precision_rf = TP/(TP+FP)
print('Точность модели (случайный лес):', round(precision_rf,4))

#Recall - Полнота Recall = TP/(TP+FN)
recall_rf = TP/(TP+FN)
print('Полнота модели (случайный лес):', round(recall_rf,4))

Верность модели (случайный лес): 0.9996
Точность модели (случайный лес): 0.9994
Полнота модели (случайный лес): 1.0


In [62]:
# Закрытие сессии
spark.stop()

#### Программа C

In [16]:
from itertools import islice
from pyspark import SparkContext
from pyspark.sql.functions import col

In [8]:
spark = SparkSession.builder.master('local[*]').getOrCreate()

# Создание SparkContext
sc = spark.sparkContext

In [15]:
def take(n, iterable):
    return list(islice(iterable, n))

def sort_dict_by_value(dict, reverse=True):
    return {key: value for key, value in sorted(dict.items(), key=lambda x: x[1], reverse=reverse)}


##### 10 самых популярных хештегов

In [35]:
main_df = spark.read.csv('/content/gdrive/MyDrive/FIFA.csv', header=True, multiLine=True, escape="\"")

groupedHashTagsByCount = main_df.rdd \
    .filter(lambda x: x['Hashtags'] != None) \
    .flatMap(lambda x: x['Hashtags'].split(',')) \
    .countByValue()
groupedHashTagsByCountSorted = sort_dict_by_value(groupedHashTagsByCount)
top10Hashtags =take(10, groupedHashTagsByCountSorted.items())
top10HashtagsDf = spark.createDataFrame(data=top10Hashtags, schema = ["hashtag","count"])
print('10 наиболее упоминаемых хэштегов')
top10HashtagsDf.show()

10 наиболее упоминаемых хэштегов
+-------------+------+
|      hashtag| count|
+-------------+------+
|     WorldCup|398744|
|          FRA| 66128|
|     worldcup| 33195|
|          CRO| 32274|
|WorldCupFinal| 28446|
|          ENG| 25165|
|       FRAARG| 21413|
|       FRABEL| 18862|
|       FRACRO| 17982|
|          BEL| 15728|
+-------------+------+



##### 10 самых популярных столиц, из которых отправляли твиты

In [36]:
capitals_df = spark.read.csv('country-list.csv', header=True, multiLine=True, escape="\"")
capitals_df = capitals_df.drop('country', 'type')

new_df = main_df.join(capitals_df, capitals_df.capital == main_df.Place, "leftouter")

groupedCapitalsByCount = new_df.rdd\
    .filter(lambda x: x['capital'] != None)\
    .map(lambda x: x['Place'])\
    .countByValue()\

groupedCapitalsByCountSorted = sort_dict_by_value(groupedCapitalsByCount)
top10Capitals =take(10, groupedCapitalsByCountSorted.items())
top10CapitalsDf = spark\
    .createDataFrame(data=top10Capitals, schema = ["capital","count"])
print('10 столиц, из которых твиты отправлялись наиболее часто')
top10CapitalsDf.show()

AnalysisException: ignored

##### Столицы, с самыми популярными хештегами в топ 10 хештегов

In [None]:
new_df = new_df\
.filter(col('capital').isNotNull())\
.drop(col('capital'))\
.join(top10CapitalsDf, top10CapitalsDf.capital == new_df.Place, "leftouter")

capitalhashTagDf = new_df.rdd\
.filter(lambda x: x['capital'] != None) \
.filter(lambda x: x['Hashtags'] != None) \
.flatMap(lambda x: [(x['capital'], hashtag) for hashtag in x['Hashtags'].split(',')])\
.toDF()\
.filter(col('_2') != 'WorldCup')

countedCapitalHashTag = capitalhashTagDf.groupBy('_1', '_2').count()

counted = countedCapitalHashTag\
.groupBy('_1')\
.agg(F.max('count').alias('num_of_hashtags'))\
.withColumnRenamed("_1", "capital")

join_conditions = [countedCapitalHashTag['_1'] == counted['capital'], countedCapitalHashTag['count'] == counted['num_of_hashtags']]
topCapitalsHashTag = counted \
.join(countedCapitalHashTag, join_conditions) \
.filter(col('num_of_hashtags').isNotNull()) \
.drop(col('_1')) \
.drop(col('count'))

topCapitalsHashtagsWhichPopular = topCapitalsHashTag \
.join(top10HashtagsDf, \
    top10HashtagsDf['hashTag'] == topCapitalsHashTag['_2']) \
.filter(col('hashTag').isNotNull()) \
.drop(col('count')) \
.drop(col('_2'))

print('Столицы, у которых самый популярный хэштег относится к 10 самым популярным хэштегам в целом')
topCapitalsHashtagsWhichPopular.show()