In [26]:
import os
import pyspark
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from pyspark.sql import SparkSession
from pyspark import SparkConf
from pyspark import  SparkContext
from pyspark.sql import functions as f
from pyspark.sql.functions import col

appName = "PySpark Task 1"
master = 'local[*]'

spark = SparkSession.builder \
    .enableHiveSupport() \
    .appName(appName) \
    .getOrCreate()

/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

**1. Сгенерировать DataFrame из трёх колонок (row_id, discipline, season) - олимпийские дисциплины по сезонам:**
*   **row_id - число порядкового номера строки;**
*   **discipline - наименование олимпийский дисциплины на английском (полностью маленькими буквами);**
*   **season - сезон дисциплины (summer / winter).**

***Укажите не менее чем по 5 дисциплин для каждого сезона.**  
**Сохраните DataFrame в csv-файл, разделитель колонок табуляция, первая строка должна содержать название колонок.**  
**Данные должны быть сохранены в виде 1 csv-файла а не множества маленьких.**

In [19]:
# Создание DataFrame
data = {
    'row_id': range(1, 11),
    'discipline': ['athletics', 'swimming', 'basketball', 'football', 'tennis', 'skiing', \
                   'hockey', 'figure_skating', 'snowboarding', 'curling'],
    'season': ['summer'] * 5 + ['winter'] * 5
}

data

{'row_id': range(1, 11),
 'discipline': ['athletics',
  'swimming',
  'basketball',
  'football',
  'tennis',
  'skiing',
  'hockey',
  'figure_skating',
  'snowboarding',
  'curling'],
 'season': ['summer',
  'summer',
  'summer',
  'summer',
  'summer',
  'winter',
  'winter',
  'winter',
  'winter',
  'winter']}

In [41]:
olympic_disciplines = pd.DataFrame(data)

olympic_disciplines

Unnamed: 0,row_id,discipline,season
0,1,athletics,summer
1,2,swimming,summer
2,3,basketball,summer
3,4,football,summer
4,5,tennis,summer
5,6,skiing,winter
6,7,hockey,winter
7,8,figure_skating,winter
8,9,snowboarding,winter
9,10,curling,winter


In [42]:
# Сохранение DataFrame в CSV-файл
olympic_disciplines.to_csv('olympic_disciplines.csv', sep='\t', index=False)

print("DataFrame сохранен в файл olympic_disciplines.csv")

DataFrame сохранен в файл olympic_disciplines.csv


In [25]:
# Чтение CSV-файла в DataFrame
df_olympic_disciplines_csv = pd.read_csv('olympic_disciplines.csv', sep='\t')

df_olympic_disciplines_csv

Unnamed: 0,row_id,discipline,season
0,1,athletics,summer
1,2,swimming,summer
2,3,basketball,summer
3,4,football,summer
4,5,tennis,summer
5,6,skiing,winter
6,7,hockey,winter
7,8,figure_skating,winter
8,9,snowboarding,winter
9,10,curling,winter


/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

**2. Прочитайте исходный файл "Athletes.csv".**  
**Посчитайте в разрезе дисциплин сколько всего спортсменов в каждой из дисциплин
принимало участие.**  
**Результат сохраните в формате parquet.**

In [31]:
# Чтение CSV-файла в DataFrame
df_athletes = pd.read_csv('Athletes.csv', sep=';')

df_athletes.head()

Unnamed: 0,Name,NOC,Discipline
0,AALERUD Katrine,Norway,Cycling Road
1,ABAD Nestor,Spain,Artistic Gymnastics
2,ABAGNALE Giovanni,Italy,Rowing
3,ABALDE Alberto,Spain,Basketball
4,ABALDE Tamara,Spain,Basketball


In [35]:
# Подсчет количества спортсменов в каждой дисциплине
result = df_athletes['Discipline'].value_counts().reset_index()
result.columns = ['discipline', 'count']

result

Unnamed: 0,discipline,count
0,Athletics,2068
1,Swimming,743
2,Football,567
3,Rowing,496
4,Hockey,406
5,Judo,373
6,Handball,343
7,Shooting,342
8,Sailing,336
9,Rugby Sevens,283


In [37]:
# Сохранение результата в формате Parquet
table = pa.Table.from_pandas(result)
pq.write_table(table, 'result.parquet')

print("Результат сохранен в файл result.parquet")

Результат сохранен в файл result.parquet


/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

**3. Прочитайте исходный файл "Athletes.csv".**  
**Посчитайте в разрезе дисциплин сколько всего спортсменов в каждой из дисциплин
принимало участие.**  
**Получившийся результат нужно объединить с сгенерированным вами DataFrame из 1-го задания и в итоге вывести количество участников, только по тем дисциплинам, что есть в вашем сгенерированном DataFrame.**  
**Результат сохраните в формате parquet.**

In [38]:
# Чтение CSV-файла в DataFrame
df_athletes = pd.read_csv('Athletes.csv', sep=';')

df_athletes.head()

Unnamed: 0,Name,NOC,Discipline
0,AALERUD Katrine,Norway,Cycling Road
1,ABAD Nestor,Spain,Artistic Gymnastics
2,ABAGNALE Giovanni,Italy,Rowing
3,ABALDE Alberto,Spain,Basketball
4,ABALDE Tamara,Spain,Basketball


In [43]:
# Подсчет количества спортсменов в каждой дисциплине
result_athletes = df_athletes['Discipline'].value_counts().reset_index()
result_athletes.columns = ['discipline', 'count']

result_athletes

Unnamed: 0,discipline,count
0,Athletics,2068
1,Swimming,743
2,Football,567
3,Rowing,496
4,Hockey,406
5,Judo,373
6,Handball,343
7,Shooting,342
8,Sailing,336
9,Rugby Sevens,283


In [44]:
# Преобразование значений в колонке 'discipline' к нижнему регистру
result_athletes['discipline'] = result_athletes['discipline'].apply(lambda x: x.lower())

result_athletes

Unnamed: 0,discipline,count
0,athletics,2068
1,swimming,743
2,football,567
3,rowing,496
4,hockey,406
5,judo,373
6,handball,343
7,shooting,342
8,sailing,336
9,rugby sevens,283


In [45]:
# Объединение результатов
merged_result = pd.merge(olympic_disciplines, result_athletes, on='discipline')

merged_result

Unnamed: 0,row_id,discipline,season,count
0,1,athletics,summer,2068
1,2,swimming,summer,743
2,3,basketball,summer,280
3,4,football,summer,567
4,5,tennis,summer,178
5,7,hockey,winter,406


In [46]:
# Вывод общего количества участников только по дисциплинам
total_athletes = merged_result['count'].sum()
print("Общее количество участников:", total_athletes)

Общее количество участников: 4242


In [47]:
# Сохранение итогового результата в формате Parquet
table = pa.Table.from_pandas(merged_result)
pq.write_table(table, 'final_result.parquet')

print("Итоговый результат сохранен в файл final_result.parquet")

Итоговый результат сохранен в файл final_result.parquet


In [48]:
# Сохранение DataFrame в CSV-файл
merged_result.to_csv('final_result.csv', sep='\t', index=False)

print("DataFrame сохранен в файл final_result.csv")

DataFrame сохранен в файл final_result.csv
