In [0]:
from pyspark.sql import Row, Column
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
#%fs rm -r projekat

##Prikupljanje i transformacija podataka
Prikupljeni su podaci o broju diplomiranih studenata po oblasti i po polu na teritoriji Republike Srbije, kao i njenim delovima. U daljem kodu, podaci su transformisani i na taj nacin prilagodjeni za dalju analizu.
Uzeti u obzir, da su podaci o diplomiranim studentima pronadjeni samo za studiranje po starom programu, dok novijih podataka nije bilo na sajtu data.gov.rs

In [0]:
file_location = "/FileStore/tables/diplomaOblasti.csv"

df = spark.read.option("inferSchema", "true").option("header", "true").csv(file_location, sep=';')
df.show(5, truncate=False)

+---------------+-----+----------------+---+----+--------+------------------------------------------+--------+
|idindikator    |IDTer|nTer            |mes|god |IDISCEDF|nISCEDF                                   |vrednost|
+---------------+-----+----------------+---+----+--------+------------------------------------------+--------+
|1104020402IND01|RS   |РЕПУБЛИКА СРБИЈА|0  |2016|0       |Укупно                                    |3175    |
|1104020402IND01|RS   |РЕПУБЛИКА СРБИЈА|0  |2016|0       |Генерички програми и квалификације        |0       |
|1104020402IND01|RS   |РЕПУБЛИКА СРБИЈА|0  |2016|1       |Образовање                                |209     |
|1104020402IND01|RS   |РЕПУБЛИКА СРБИЈА|0  |2016|2       |Уметности и хуманистичке науке            |562     |
|1104020402IND01|RS   |РЕПУБЛИКА СРБИЈА|0  |2016|3       |Друштвене науке, новинарство и информисање|396     |
+---------------+-----+----------------+---+----+--------+------------------------------------------+--------+
o

Kolonu mes odbacujemo posto nam nece sluziti nicemu. U daljim koracima prilagodjavamo ime kolona, menjamo nazive teritorija na latinicu, a vrednosti kolone "nazivOblasti" korigujemo da su napisane malim slovima.

In [0]:
df.select("mes").distinct().show()

+---+
|mes|
+---+
|  0|
+---+



In [0]:
df = df.drop("mes")
df.show(5, truncate=False)

+---------------+-----+----------------+----+--------+------------------------------------------+--------+
|idindikator    |IDTer|nTer            |god |IDISCEDF|nISCEDF                                   |vrednost|
+---------------+-----+----------------+----+--------+------------------------------------------+--------+
|1104020402IND01|RS   |РЕПУБЛИКА СРБИЈА|2016|0       |Укупно                                    |3175    |
|1104020402IND01|RS   |РЕПУБЛИКА СРБИЈА|2016|0       |Генерички програми и квалификације        |0       |
|1104020402IND01|RS   |РЕПУБЛИКА СРБИЈА|2016|1       |Образовање                                |209     |
|1104020402IND01|RS   |РЕПУБЛИКА СРБИЈА|2016|2       |Уметности и хуманистичке науке            |562     |
|1104020402IND01|RS   |РЕПУБЛИКА СРБИЈА|2016|3       |Друштвене науке, новинарство и информисање|396     |
+---------------+-----+----------------+----+--------+------------------------------------------+--------+
only showing top 5 rows



In [0]:
df = df.withColumnRenamed("god", "godina")\
       .withColumnRenamed("IDISCEDF", "IDOblasti")\
       .withColumnRenamed("nISCEDF", "nazivOblasti")\
       .withColumnRenamed("nTer", "nazivTer")
df.show(5, truncate=False)

+---------------+-----+----------------+------+---------+------------------------------------------+--------+
|idindikator    |IDTer|nazivTer        |godina|IDOblasti|nazivOblasti                              |vrednost|
+---------------+-----+----------------+------+---------+------------------------------------------+--------+
|1104020402IND01|RS   |РЕПУБЛИКА СРБИЈА|2016  |0        |Укупно                                    |3175    |
|1104020402IND01|RS   |РЕПУБЛИКА СРБИЈА|2016  |0        |Генерички програми и квалификације        |0       |
|1104020402IND01|RS   |РЕПУБЛИКА СРБИЈА|2016  |1        |Образовање                                |209     |
|1104020402IND01|RS   |РЕПУБЛИКА СРБИЈА|2016  |2        |Уметности и хуманистичке науке            |562     |
|1104020402IND01|RS   |РЕПУБЛИКА СРБИЈА|2016  |3        |Друштвене науке, новинарство и информисање|396     |
+---------------+-----+----------------+------+---------+------------------------------------------+--------+
only showi

In [0]:
df.printSchema()

root
 |-- idindikator: string (nullable = true)
 |-- IDTer: string (nullable = true)
 |-- nazivTer: string (nullable = true)
 |-- godina: integer (nullable = true)
 |-- IDOblasti: integer (nullable = true)
 |-- nazivOblasti: string (nullable = true)
 |-- vrednost: integer (nullable = true)



In [0]:
df = df.withColumn("nazivOblasti",lower(col("nazivOblasti")))
df.select("nazivOblasti").distinct().show(truncate=False)

+----------------------------------------------+
|nazivOblasti                                  |
+----------------------------------------------+
|информационе и комуникационе технологије (икт)|
|укупно                                        |
|услуге                                        |
|пољопривреда, шумарство, рибарство и ветерина |
|пословање, администрација и право             |
|инжењерство, производња и грађевинарство      |
|здравство и социјална помоћ                   |
|генерички програми и квалификације            |
|природне науке, математика и статистика       |
|друштвене науке, новинарство и информисање    |
|уметности и хуманистичке науке                |
|образовање                                    |
+----------------------------------------------+



In [0]:
df.select("nazivTer").distinct().show(truncate=False)

+--------------------------------+
|nazivTer                        |
+--------------------------------+
|СРБИЈА – ЈУГ                    |
|Регион Шумадије и Западне Србије|
|РЕПУБЛИКА СРБИЈА                |
|Регион Јужне и Источне Србије   |
|Београдски регион               |
|СРБИЈА – СЕВЕР                  |
|Регион Војводине                |
|Регион Косовo и Метохијa        |
+--------------------------------+



In [0]:
arr = ["Srbija - JUG", "Region Sumadije i Zapadne Srbije", "Republika Srbija", "Region Juzne i Istone Srbije", "Beogradski region", "Srbija - SEVER", "Region Vojvodine", "Region KiM"]
ter = df.select("nazivTer").distinct().collect()
ter = [(sub,out["nazivTer"]) for sub,out in zip(arr,ter)]

for pair in ter:
    print(pair)
    df = df.withColumn("nazivTer", regexp_replace("nazivTer",pair[1],pair[0]))
df.select("nazivTer").distinct().show()

('Srbija - JUG', 'СРБИЈА – ЈУГ')
('Region Sumadije i Zapadne Srbije', 'Регион Шумадије и Западне Србије')
('Republika Srbija', 'РЕПУБЛИКА СРБИЈА')
('Region Juzne i Istone Srbije', 'Регион Јужне и Источне Србије')
('Beogradski region', 'Београдски регион')
('Srbija - SEVER', 'СРБИЈА – СЕВЕР')
('Region Vojvodine', 'Регион Војводине')
('Region KiM', 'Регион Косовo и Метохијa')
+--------------------+
|            nazivTer|
+--------------------+
|      Srbija - SEVER|
|    Republika Srbija|
|Region Sumadije i...|
|Region Juzne i Is...|
|          Region KiM|
|        Srbija - JUG|
|   Beogradski region|
|    Region Vojvodine|
+--------------------+



In [0]:
df.show(5, truncate=False)

+---------------+-----+----------------+------+---------+------------------------------------------+--------+
|idindikator    |IDTer|nazivTer        |godina|IDOblasti|nazivOblasti                              |vrednost|
+---------------+-----+----------------+------+---------+------------------------------------------+--------+
|1104020402IND01|RS   |Republika Srbija|2016  |0        |укупно                                    |3175    |
|1104020402IND01|RS   |Republika Srbija|2016  |0        |генерички програми и квалификације        |0       |
|1104020402IND01|RS   |Republika Srbija|2016  |1        |образовање                                |209     |
|1104020402IND01|RS   |Republika Srbija|2016  |2        |уметности и хуманистичке науке            |562     |
|1104020402IND01|RS   |Republika Srbija|2016  |3        |друштвене науке, новинарство и информисање|396     |
+---------------+-----+----------------+------+---------+------------------------------------------+--------+
only showi

Za transformaciju podataka o upisanim studentima po oblastima, radimo slicnu stvar. Prvo menjamo nazive kolona, nazive oblasti setujemo na mala slova, a nazive teritorija prevodimo u latinicu.

In [0]:
file_location = "/FileStore/tables/upisaniOblasti.csv"

df2 = spark.read.option("inferSchema", "true").option("header", "true").csv(file_location, sep=';')
df2.show(5, truncate=False)

+---------------+-----+----------------+---+----+--------+------------------------------------------+--------+
|idindikator    |IDTer|nTer            |mes|god |IDISCEDF|nISCEDF                                   |vrednost|
+---------------+-----+----------------+---+----+--------+------------------------------------------+--------+
|1104010103IND01|RS   |РЕПУБЛИКА СРБИЈА|0  |2016|0       |Укупно                                    |201445  |
|1104010103IND01|RS   |РЕПУБЛИКА СРБИЈА|0  |2016|0       |Генерички програми и квалификације        |0       |
|1104010103IND01|RS   |РЕПУБЛИКА СРБИЈА|0  |2016|1       |Образовање                                |14813   |
|1104010103IND01|RS   |РЕПУБЛИКА СРБИЈА|0  |2016|2       |Уметности и хуманистичке науке            |22370   |
|1104010103IND01|RS   |РЕПУБЛИКА СРБИЈА|0  |2016|3       |Друштвене науке, новинарство и информисање|24856   |
+---------------+-----+----------------+---+----+--------+------------------------------------------+--------+
o

In [0]:
df2 = df2.withColumnRenamed("IDISCEDF", "ID_Oblasti")\
       .withColumnRenamed("nISCEDF", "naziv_Oblasti")\
       .withColumnRenamed("nTer", "naziv_Ter")\
       .withColumnRenamed("vrednost", "brojUpisanih")\
       .drop("mes")
df2.show(5, truncate=False)

+---------------+-----+----------------+----+----------+------------------------------------------+------------+
|idindikator    |IDTer|naziv_Ter       |god |ID_Oblasti|naziv_Oblasti                             |brojUpisanih|
+---------------+-----+----------------+----+----------+------------------------------------------+------------+
|1104010103IND01|RS   |РЕПУБЛИКА СРБИЈА|2016|0         |Укупно                                    |201445      |
|1104010103IND01|RS   |РЕПУБЛИКА СРБИЈА|2016|0         |Генерички програми и квалификације        |0           |
|1104010103IND01|RS   |РЕПУБЛИКА СРБИЈА|2016|1         |Образовање                                |14813       |
|1104010103IND01|RS   |РЕПУБЛИКА СРБИЈА|2016|2         |Уметности и хуманистичке науке            |22370       |
|1104010103IND01|RS   |РЕПУБЛИКА СРБИЈА|2016|3         |Друштвене науке, новинарство и информисање|24856       |
+---------------+-----+----------------+----+----------+----------------------------------------

In [0]:
df2 = df2.withColumn("naziv_Oblasti",lower(col("naziv_Oblasti")))
df2.select("naziv_Oblasti").distinct().show(truncate=False)

+----------------------------------------------+
|naziv_Oblasti                                 |
+----------------------------------------------+
|информационе и комуникационе технологије (икт)|
|укупно                                        |
|услуге                                        |
|пољопривреда, шумарство, рибарство и ветерина |
|пословање, администрација и право             |
|инжењерство, производња и грађевинарство      |
|здравство и социјална помоћ                   |
|генерички програми и квалификације            |
|природне науке, математика и статистика       |
|друштвене науке, новинарство и информисање    |
|уметности и хуманистичке науке                |
|образовање                                    |
+----------------------------------------------+



In [0]:
df2.select("naziv_Ter").distinct().show()

+--------------------+
|           naziv_Ter|
+--------------------+
|        СРБИЈА – ЈУГ|
|Регион Шумадије и...|
|    РЕПУБЛИКА СРБИЈА|
|Регион Јужне и Ис...|
|   Београдски регион|
|      СРБИЈА – СЕВЕР|
|    Регион Војводине|
|Регион Косовo и М...|
+--------------------+



In [0]:
arr = ["Srbija - JUG", "Region Sumadije i Zapadne Srbije", "Republika Srbija", "Region Juzne i Istone Srbije", "Beogradski region", "Srbija - SEVER", "Region Vojvodine", "Region KiM"]
ter = df2.select("naziv_Ter").distinct().collect()
ter = [(sub,out["naziv_Ter"]) for sub,out in zip(arr,ter)]

for pair in ter:
    print(pair)
    df2 = df2.withColumn("naziv_Ter", regexp_replace("naziv_Ter",pair[1],pair[0]))
df2.select("naziv_Ter").distinct().show()

('Srbija - JUG', 'СРБИЈА – ЈУГ')
('Region Sumadije i Zapadne Srbije', 'Регион Шумадије и Западне Србије')
('Republika Srbija', 'РЕПУБЛИКА СРБИЈА')
('Region Juzne i Istone Srbije', 'Регион Јужне и Источне Србије')
('Beogradski region', 'Београдски регион')
('Srbija - SEVER', 'СРБИЈА – СЕВЕР')
('Region Vojvodine', 'Регион Војводине')
('Region KiM', 'Регион Косовo и Метохијa')
+--------------------+
|           naziv_Ter|
+--------------------+
|      Srbija - SEVER|
|    Republika Srbija|
|Region Sumadije i...|
|Region Juzne i Is...|
|          Region KiM|
|        Srbija - JUG|
|   Beogradski region|
|    Region Vojvodine|
+--------------------+



In [0]:
df2.show(5, truncate=False)

+---------------+-----+----------------+----+----------+------------------------------------------+------------+
|idindikator    |IDTer|naziv_Ter       |god |ID_Oblasti|naziv_Oblasti                             |brojUpisanih|
+---------------+-----+----------------+----+----------+------------------------------------------+------------+
|1104010103IND01|RS   |Republika Srbija|2016|0         |укупно                                    |201445      |
|1104010103IND01|RS   |Republika Srbija|2016|0         |генерички програми и квалификације        |0           |
|1104010103IND01|RS   |Republika Srbija|2016|1         |образовање                                |14813       |
|1104010103IND01|RS   |Republika Srbija|2016|2         |уметности и хуманистичке науке            |22370       |
|1104010103IND01|RS   |Republika Srbija|2016|3         |друштвене науке, новинарство и информисање|24856       |
+---------------+-----+----------------+----+----------+----------------------------------------

Upisujemo podatke.

In [0]:
path = 'dbfs:/projekat/'
csv_name = path + 'dfUpisani.csv'
parquet_name = path + 'dfUpisani.parquet'
orc_name = path + 'dfUpisani.orc'

df2.write.csv(csv_name, header=True)
df2_csv = spark.read.csv(csv_name, header=True, inferSchema=True)
df2_csv.printSchema()

df2.write.save(parquet_name, format='parquet') 
spark.read.load(parquet_name, format='parquet')

root
 |-- idindikator: string (nullable = true)
 |-- IDTer: string (nullable = true)
 |-- naziv_Ter: string (nullable = true)
 |-- god: integer (nullable = true)
 |-- ID_Oblasti: integer (nullable = true)
 |-- naziv_Oblasti: string (nullable = true)
 |-- brojUpisanih: integer (nullable = true)

Out[18]: DataFrame[idindikator: string, IDTer: string, naziv_Ter: string, god: int, ID_Oblasti: int, naziv_Oblasti: string, brojUpisanih: int]

Podatke o diplomiranim studentima po polu najpre splitujemo po ; kako bismo izdvojili kolone. Zatim odbacujemo kolonu mes, jer nam nicemu nece sluziti, menjamo tip vrednosti za kolone "vrednost" i "god", zelimo da budu integer a ne tipa string.
Vrednosti kolone "nazivTer" transformisemo da se cuvaju latinicno, kao i vrednosti za naziv pola (Musko -> M, Zensko ->Z).

In [0]:
file_location = "/FileStore/tables/diplomaPol.csv"
df1 = spark.read.option("inferSchema", "true").option("header", "true").csv(file_location)
df1.show(5)

+--------------------------------------------------+
|idindikator;IDTer;nTer;mes;god;IDPol;nPol;vrednost|
+--------------------------------------------------+
|                              1104020401IND01;R...|
|                              1104020401IND01;R...|
|                              1104020401IND01;R...|
|                              1104020401IND01;R...|
|                              1104020401IND01;R...|
+--------------------------------------------------+
only showing top 5 rows



In [0]:
df1 = df1.withColumn('idindikator', split(df1['idindikator;IDTer;nTer;mes;god;IDPol;nPol;vrednost'], ';').getItem(0)) \
       .withColumn('IDTer', split(df1['idindikator;IDTer;nTer;mes;god;IDPol;nPol;vrednost'], ';').getItem(1)) \
       .withColumn('nTer', split(df1['idindikator;IDTer;nTer;mes;god;IDPol;nPol;vrednost'], ';').getItem(2))\
       .withColumn('mes', split(df1['idindikator;IDTer;nTer;mes;god;IDPol;nPol;vrednost'], ';').getItem(3))\
       .withColumn('god', split(df1['idindikator;IDTer;nTer;mes;god;IDPol;nPol;vrednost'], ';').getItem(4))\
       .withColumn('IDPol', split(df1['idindikator;IDTer;nTer;mes;god;IDPol;nPol;vrednost'], ';').getItem(5))\
       .withColumn('nPol', split(df1['idindikator;IDTer;nTer;mes;god;IDPol;nPol;vrednost'], ';').getItem(6))\
       .withColumn('vrednost', split(df1['idindikator;IDTer;nTer;mes;god;IDPol;nPol;vrednost'], ';').getItem(7))\
       .drop('idindikator;IDTer;nTer;mes;god;IDPol;nPol;vrednost')
df1.show(5, truncate=False)

+---------------+-----+----------------+---+----+-----+------+--------+
|idindikator    |IDTer|nTer            |mes|god |IDPol|nPol  |vrednost|
+---------------+-----+----------------+---+----+-----+------+--------+
|1104020401IND01|RS   |РЕПУБЛИКА СРБИЈА|00 |2016|0    |Укупно|3175    |
|1104020401IND01|RS   |РЕПУБЛИКА СРБИЈА|00 |2016|1    |Мушко |1230    |
|1104020401IND01|RS   |РЕПУБЛИКА СРБИЈА|00 |2016|2    |Женско|1945    |
|1104020401IND01|RS1  |СРБИЈА – СЕВЕР  |00 |2016|0    |Укупно|2279    |
|1104020401IND01|RS1  |СРБИЈА – СЕВЕР  |00 |2016|1    |Мушко |861     |
+---------------+-----+----------------+---+----+-----+------+--------+
only showing top 5 rows



In [0]:
df1 = df1.drop('mes')

In [0]:
df1 = df1.withColumnRenamed("nPol", "nazivPol")\
         .withColumnRenamed("nTer", "nazivTer")
df1.show(5, truncate=False)

+---------------+-----+----------------+----+-----+--------+--------+
|idindikator    |IDTer|nazivTer        |god |IDPol|nazivPol|vrednost|
+---------------+-----+----------------+----+-----+--------+--------+
|1104020401IND01|RS   |РЕПУБЛИКА СРБИЈА|2016|0    |Укупно  |3175    |
|1104020401IND01|RS   |РЕПУБЛИКА СРБИЈА|2016|1    |Мушко   |1230    |
|1104020401IND01|RS   |РЕПУБЛИКА СРБИЈА|2016|2    |Женско  |1945    |
|1104020401IND01|RS1  |СРБИЈА – СЕВЕР  |2016|0    |Укупно  |2279    |
|1104020401IND01|RS1  |СРБИЈА – СЕВЕР  |2016|1    |Мушко   |861     |
+---------------+-----+----------------+----+-----+--------+--------+
only showing top 5 rows



In [0]:
df1.printSchema()
df1 = df1.withColumn("god", df1["god"].cast('integer'))\
         .withColumn("vrednost", df1["vrednost"].cast('integer'))
df1.printSchema()

root
 |-- idindikator: string (nullable = true)
 |-- IDTer: string (nullable = true)
 |-- nazivTer: string (nullable = true)
 |-- god: string (nullable = true)
 |-- IDPol: string (nullable = true)
 |-- nazivPol: string (nullable = true)
 |-- vrednost: string (nullable = true)

root
 |-- idindikator: string (nullable = true)
 |-- IDTer: string (nullable = true)
 |-- nazivTer: string (nullable = true)
 |-- god: integer (nullable = true)
 |-- IDPol: string (nullable = true)
 |-- nazivPol: string (nullable = true)
 |-- vrednost: integer (nullable = true)



In [0]:
df1.select('nazivTer').distinct().show()

+--------------------+
|            nazivTer|
+--------------------+
|        СРБИЈА – ЈУГ|
|Регион Шумадије и...|
|    РЕПУБЛИКА СРБИЈА|
|Регион Јужне и Ис...|
|   Београдски регион|
|      СРБИЈА – СЕВЕР|
|    Регион Војводине|
|Регион Косовo и М...|
+--------------------+



In [0]:
arr = ["Srbija - JUG", "Region Sumadije", "Republika Srbija", "Region Juzne i Istone Srbije", "Beogradski region", "Srbija - SEVER", "Region Vojvodine", "Region KiM"]
ter = df1.select("nazivTer").distinct().collect()
ter = [(sub,out["nazivTer"]) for sub,out in zip(arr,ter)]

for pair in ter:
    print(pair)
    df1 = df1.withColumn("nazivTer", regexp_replace("nazivTer",pair[1],pair[0]))
df1.select("nazivTer").distinct().show()

('Srbija - JUG', 'СРБИЈА – ЈУГ')
('Region Sumadije', 'Регион Шумадије и Западне Србије')
('Republika Srbija', 'РЕПУБЛИКА СРБИЈА')
('Region Juzne i Istone Srbije', 'Регион Јужне и Источне Србије')
('Beogradski region', 'Београдски регион')
('Srbija - SEVER', 'СРБИЈА – СЕВЕР')
('Region Vojvodine', 'Регион Војводине')
('Region KiM', 'Регион Косовo и Метохијa')
+--------------------+
|            nazivTer|
+--------------------+
|      Srbija - SEVER|
|    Republika Srbija|
|     Region Sumadije|
|Region Juzne i Is...|
|          Region KiM|
|        Srbija - JUG|
|   Beogradski region|
|    Region Vojvodine|
+--------------------+



In [0]:
df1.select('nazivPol').distinct().show()

+--------+
|nazivPol|
+--------+
|   Мушко|
|  Укупно|
|  Женско|
+--------+



In [0]:
arr = ["M", "Ukupno", "Z"]
pol = df1.select("nazivPol").distinct().collect()
pol = [(sub,out["nazivPol"]) for sub,out in zip(arr,pol)]

for pair in pol:
    print(pair)
    df1 = df1.withColumn("nazivPol", regexp_replace("nazivPol",pair[1],pair[0]))
df1.select("nazivPol").distinct().show()

('M', 'Мушко')
('Ukupno', 'Укупно')
('Z', 'Женско')
+--------+
|nazivPol|
+--------+
|       M|
|       Z|
|  Ukupno|
+--------+



In [0]:
df1.show(5, truncate=False)

+---------------+-----+----------------+----+-----+--------+--------+
|idindikator    |IDTer|nazivTer        |god |IDPol|nazivPol|vrednost|
+---------------+-----+----------------+----+-----+--------+--------+
|1104020401IND01|RS   |Republika Srbija|2016|0    |Ukupno  |3175    |
|1104020401IND01|RS   |Republika Srbija|2016|1    |M       |1230    |
|1104020401IND01|RS   |Republika Srbija|2016|2    |Z       |1945    |
|1104020401IND01|RS1  |Srbija - SEVER  |2016|0    |Ukupno  |2279    |
|1104020401IND01|RS1  |Srbija - SEVER  |2016|1    |M       |861     |
+---------------+-----+----------------+----+-----+--------+--------+
only showing top 5 rows



Na kraju, cuvamo tako transformisane podatke.

In [0]:
path = 'dbfs:/projekat/'
csv_name = path + 'df.csv'
parquet_name = path + 'df.parquet'
orc_name = path + 'df.orc'

print(path,csv_name,parquet_name,orc_name,sep='\n')

dbfs:/projekat/
dbfs:/projekat/df.csv
dbfs:/projekat/df.parquet
dbfs:/projekat/df.orc


In [0]:
df.write.csv(csv_name, header=True)
df_csv = spark.read.csv(csv_name, header=True)
df_csv.printSchema()
df_csv = spark.read.csv(csv_name, header=True, inferSchema=True)
df_csv.printSchema()

root
 |-- idindikator: string (nullable = true)
 |-- IDTer: string (nullable = true)
 |-- nazivTer: string (nullable = true)
 |-- godina: string (nullable = true)
 |-- IDOblasti: string (nullable = true)
 |-- nazivOblasti: string (nullable = true)
 |-- vrednost: string (nullable = true)

root
 |-- idindikator: string (nullable = true)
 |-- IDTer: string (nullable = true)
 |-- nazivTer: string (nullable = true)
 |-- godina: integer (nullable = true)
 |-- IDOblasti: integer (nullable = true)
 |-- nazivOblasti: string (nullable = true)
 |-- vrednost: integer (nullable = true)



In [0]:
df.write.save(parquet_name, format='parquet') 
spark.read.load(parquet_name, format='parquet')

Out[32]: DataFrame[idindikator: string, IDTer: string, nazivTer: string, godina: int, IDOblasti: int, nazivOblasti: string, vrednost: int]

In [0]:
path = 'dbfs:/projekat/'
csv_name = path + 'dfPol.csv'
parquet_name = path + 'dfPol.parquet'
orc_name = path + 'dfPol.orc'

df1.write.csv(csv_name, header=True)
df1_csv = spark.read.csv(csv_name, header=True)
df1_csv.printSchema()
df1_csv = spark.read.csv(csv_name, header=True, inferSchema=True)
df1_csv.printSchema()

root
 |-- idindikator: string (nullable = true)
 |-- IDTer: string (nullable = true)
 |-- nazivTer: string (nullable = true)
 |-- god: string (nullable = true)
 |-- IDPol: string (nullable = true)
 |-- nazivPol: string (nullable = true)
 |-- vrednost: string (nullable = true)

root
 |-- idindikator: string (nullable = true)
 |-- IDTer: string (nullable = true)
 |-- nazivTer: string (nullable = true)
 |-- god: integer (nullable = true)
 |-- IDPol: integer (nullable = true)
 |-- nazivPol: string (nullable = true)
 |-- vrednost: integer (nullable = true)



In [0]:
df1.write.save(parquet_name, format='parquet') 
spark.read.load(parquet_name, format='parquet')

Out[34]: DataFrame[idindikator: string, IDTer: string, nazivTer: string, god: int, IDPol: string, nazivPol: string, vrednost: int]

Na podatke o upisanim studentima po polu primenjujemo slicne transformacije kao na gore pomenutim podacima; menjamo nazive kolona, sa cirilice prebacujemo na latinicu nazive teritorija, takodje prilagodjavamo naziv pola.

In [0]:
file_location = "/FileStore/tables/upisaniPol.csv"

df3 = spark.read.option("inferSchema", "true").option("header", "true").csv(file_location, sep=';')
df3.show(5, truncate=False)

+-------------+-----+----------------+---+----+-----+------+--------+
|idindikator  |IDTer|nTer            |mes|god |IDPol|nPol  |vrednost|
+-------------+-----+----------------+---+----+-----+------+--------+
|11040101IND01|RS   |РЕПУБЛИКА СРБИЈА|0  |2007|0    |Укупно|145493  |
|11040101IND01|RS   |РЕПУБЛИКА СРБИЈА|0  |2007|1    |Мушко |68636   |
|11040101IND01|RS   |РЕПУБЛИКА СРБИЈА|0  |2007|2    |Женско|76857   |
|11040101IND01|RS   |РЕПУБЛИКА СРБИЈА|0  |2008|0    |Укупно|161038  |
|11040101IND01|RS   |РЕПУБЛИКА СРБИЈА|0  |2008|1    |Мушко |75844   |
+-------------+-----+----------------+---+----+-----+------+--------+
only showing top 5 rows



In [0]:
df3 = df3.withColumnRenamed("vrednost", "brojUpisanih")\
         .withColumnRenamed("god", "godina")
df3.show(5, truncate=False)

+-------------+-----+----------------+---+------+-----+------+------------+
|idindikator  |IDTer|nTer            |mes|godina|IDPol|nPol  |brojUpisanih|
+-------------+-----+----------------+---+------+-----+------+------------+
|11040101IND01|RS   |РЕПУБЛИКА СРБИЈА|0  |2007  |0    |Укупно|145493      |
|11040101IND01|RS   |РЕПУБЛИКА СРБИЈА|0  |2007  |1    |Мушко |68636       |
|11040101IND01|RS   |РЕПУБЛИКА СРБИЈА|0  |2007  |2    |Женско|76857       |
|11040101IND01|RS   |РЕПУБЛИКА СРБИЈА|0  |2008  |0    |Укупно|161038      |
|11040101IND01|RS   |РЕПУБЛИКА СРБИЈА|0  |2008  |1    |Мушко |75844       |
+-------------+-----+----------------+---+------+-----+------+------------+
only showing top 5 rows



In [0]:
df3 = df3.drop("mes")
df3.show(5)

+-------------+-----+----------------+------+-----+------+------------+
|  idindikator|IDTer|            nTer|godina|IDPol|  nPol|brojUpisanih|
+-------------+-----+----------------+------+-----+------+------------+
|11040101IND01|   RS|РЕПУБЛИКА СРБИЈА|  2007|    0|Укупно|      145493|
|11040101IND01|   RS|РЕПУБЛИКА СРБИЈА|  2007|    1| Мушко|       68636|
|11040101IND01|   RS|РЕПУБЛИКА СРБИЈА|  2007|    2|Женско|       76857|
|11040101IND01|   RS|РЕПУБЛИКА СРБИЈА|  2008|    0|Укупно|      161038|
|11040101IND01|   RS|РЕПУБЛИКА СРБИЈА|  2008|    1| Мушко|       75844|
+-------------+-----+----------------+------+-----+------+------------+
only showing top 5 rows



In [0]:
arr = ["M", "Ukupno", "Z"]
pol = df3.select("nPol").distinct().collect()
pol = [(sub,out["nPol"]) for sub,out in zip(arr,pol)]

for pair in pol:
    df3 = df3.withColumn("nPol", regexp_replace("nPol",pair[1],pair[0]))
df3.select("nPol").distinct().show()

+------+
|  nPol|
+------+
|     M|
|     Z|
|Ukupno|
+------+



In [0]:
arr = ["Srbija - JUG", "Region Sumadije", "Republika Srbija", "Region Juzne i Istone Srbije", "Beogradski region", "Srbija - SEVER", "Region Vojvodine", "Region KiM"]
ter = df3.select("nTer").distinct().collect()
ter = [(sub,out["nTer"]) for sub,out in zip(arr,ter)]

for pair in ter:
    print(pair)
    df3 = df3.withColumn("nTer", regexp_replace("nTer",pair[1],pair[0]))
df3.select("nTer").distinct().show()

('Srbija - JUG', 'СРБИЈА – ЈУГ')
('Region Sumadije', 'Регион Шумадије и Западне Србије')
('Republika Srbija', 'РЕПУБЛИКА СРБИЈА')
('Region Juzne i Istone Srbije', 'Регион Јужне и Источне Србије')
('Beogradski region', 'Београдски регион')
('Srbija - SEVER', 'СРБИЈА – СЕВЕР')
('Region Vojvodine', 'Регион Војводине')
('Region KiM', 'Регион Косовo и Метохијa')
+--------------------+
|                nTer|
+--------------------+
|      Srbija - SEVER|
|    Republika Srbija|
|     Region Sumadije|
|Region Juzne i Is...|
|          Region KiM|
|        Srbija - JUG|
|   Beogradski region|
|    Region Vojvodine|
+--------------------+



In [0]:
df3.show(5)

+-------------+-----+----------------+------+-----+------+------------+
|  idindikator|IDTer|            nTer|godina|IDPol|  nPol|brojUpisanih|
+-------------+-----+----------------+------+-----+------+------------+
|11040101IND01|   RS|Republika Srbija|  2007|    0|Ukupno|      145493|
|11040101IND01|   RS|Republika Srbija|  2007|    1|     M|       68636|
|11040101IND01|   RS|Republika Srbija|  2007|    2|     Z|       76857|
|11040101IND01|   RS|Republika Srbija|  2008|    0|Ukupno|      161038|
|11040101IND01|   RS|Republika Srbija|  2008|    1|     M|       75844|
+-------------+-----+----------------+------+-----+------+------------+
only showing top 5 rows



Cuvamo podatke, s tim sto ih i particionisemo s obzirom da cemo ove podatke koristiti i za streaming.

In [0]:
path = 'dbfs:/projekat/'
csv_name3 = path + 'dfUpisaniPol.csv'
df3.repartition(10).write.mode("overwrite").csv(csv_name3, header=True)

In [0]:
display(dbutils.fs.ls(csv_name3))

path,name,size
dbfs:/projekat/dfUpisaniPol.csv/_SUCCESS,_SUCCESS,0
dbfs:/projekat/dfUpisaniPol.csv/_committed_7292930720076131006,_committed_7292930720076131006,914
dbfs:/projekat/dfUpisaniPol.csv/_started_7292930720076131006,_started_7292930720076131006,0
dbfs:/projekat/dfUpisaniPol.csv/part-00000-tid-7292930720076131006-718fbf44-bdf9-4a24-8b61-5012b9d57069-173-1-c000.csv,part-00000-tid-7292930720076131006-718fbf44-bdf9-4a24-8b61-5012b9d57069-173-1-c000.csv,1440
dbfs:/projekat/dfUpisaniPol.csv/part-00001-tid-7292930720076131006-718fbf44-bdf9-4a24-8b61-5012b9d57069-174-1-c000.csv,part-00001-tid-7292930720076131006-718fbf44-bdf9-4a24-8b61-5012b9d57069-174-1-c000.csv,1421
dbfs:/projekat/dfUpisaniPol.csv/part-00002-tid-7292930720076131006-718fbf44-bdf9-4a24-8b61-5012b9d57069-175-1-c000.csv,part-00002-tid-7292930720076131006-718fbf44-bdf9-4a24-8b61-5012b9d57069-175-1-c000.csv,1434
dbfs:/projekat/dfUpisaniPol.csv/part-00003-tid-7292930720076131006-718fbf44-bdf9-4a24-8b61-5012b9d57069-176-1-c000.csv,part-00003-tid-7292930720076131006-718fbf44-bdf9-4a24-8b61-5012b9d57069-176-1-c000.csv,1471
dbfs:/projekat/dfUpisaniPol.csv/part-00004-tid-7292930720076131006-718fbf44-bdf9-4a24-8b61-5012b9d57069-177-1-c000.csv,part-00004-tid-7292930720076131006-718fbf44-bdf9-4a24-8b61-5012b9d57069-177-1-c000.csv,1402
dbfs:/projekat/dfUpisaniPol.csv/part-00005-tid-7292930720076131006-718fbf44-bdf9-4a24-8b61-5012b9d57069-178-1-c000.csv,part-00005-tid-7292930720076131006-718fbf44-bdf9-4a24-8b61-5012b9d57069-178-1-c000.csv,1390
dbfs:/projekat/dfUpisaniPol.csv/part-00006-tid-7292930720076131006-718fbf44-bdf9-4a24-8b61-5012b9d57069-179-1-c000.csv,part-00006-tid-7292930720076131006-718fbf44-bdf9-4a24-8b61-5012b9d57069-179-1-c000.csv,1423
