# Zadanie: Wyznaczenie 3 najpopularniejszych języków dla danej kategorii dla wszystkich aplikacji których cena jest większa bądź równa średniej cenie dla danej aplikacji
## Loading Data

In [None]:
data = spark.read.csv('/user/wawrzynimaci/data/iosdata.csv', header='true', inferSchema='true')

## Schemat danych oraz kolumny 

In [None]:
print(data.schema)
data.columns

In [None]:
print(f'Liczba początkowa rekordów wczytanej bazy: {data.count()}')

## W tym kroku tworzę df, która będzie przechowywała średnią cenę aplikacji dla każdej kategorii.

In [None]:
from pyspark.sql import functions as F
avgCategoryPrice = data.filter("Price_USD is not NULL").where(data.Price_USD >= 0).groupBy('Primary_Genre').avg('Price_USD')
avgCategoryPrice = avgCategoryPrice.withColumnRenamed("avg(Price_USD)", "AvgVal").withColumnRenamed("Primary_Genre", "Genre")

## Średnie cene dla każdej kategorii.

In [None]:
avgCategoryPrice.show(30)

## Teraz usuwamy wszystkie te wartości, których cena jest mniejsza niż średnia cena dla kategorii 

In [None]:
dataGreaterAvge = avgCategoryPrice.join(data, avgCategoryPrice.Genre == data.Primary_Genre)
dataGreaterAvge = dataGreaterAvge.where(dataGreaterAvge.Price_USD >= dataGreaterAvge.AvgVal)

In [None]:
print('Liczba rekordów po usunięciu wartości, których cena była mniejsza od średniej ceny dla danej kategorii:')
print(f'{dataGreaterAvge.count()}')

## Teraz tworzymy nowy DF na którym będziemy dalej pracować i analizować dane

In [None]:
correctData = dataGreaterAvge.select('Price_USD','Primary_Genre','Languages')

In [None]:
print('Liczba rekordów po skopiowaniu df:')
print(f'{correctData.count()}')

## Rozbijam teraz tablicę N elementową na N wierwszy i czyszczę błędne spacje

In [None]:
from pyspark.sql.functions import explode, split, regexp_replace
explodeLanguages = correctData.select(correctData.Primary_Genre, explode(split(regexp_replace(correctData.Languages,"[\\[|\\'|\\]]",""),",")))
explodeLanguages = explodeLanguages.withColumnRenamed("col", "Language")
explodeLanguages.columns

correctExplodeLanguages = explodeLanguages.select(explodeLanguages.Primary_Genre, regexp_replace(explodeLanguages.Language, " | ", "")).withColumnRenamed("regexp_replace(Language,  | , )", "Language")

## Zliczanie języków - zlicza dla danej kategorii liczbę wystąpień danego języka.

In [None]:
languages_count = correctExplodeLanguages.groupBy(correctExplodeLanguages.Primary_Genre, correctExplodeLanguages.Language).count().orderBy(correctExplodeLanguages.Primary_Genre, F.desc('count'))

## Pobranie 3 języków które najczęściej występują dla każdej kategorii 

In [None]:
from pyspark.sql.window import Window as W
ranked =  languages_count.withColumn('rank', F.rank().over(W.partitionBy('Primary_Genre').orderBy("Primary_Genre", F.desc('count'))))
ranked.where(ranked.rank <= 3).orderBy("Primary_Genre", F.desc('count')).show(120)

In [None]:
spark.stop()