Goal: to apply machine learning methods to predict atmospheric pollution from the following data:
        https://www.kaggle.com/unitednations/international-greenhouse-gas-emissions
using the decision tree and the Spark large database tool (PySpark for python)

Цель: применить методы машинного обучения для предсказывания загрязнений атмосферы по данным:
        https://www.kaggle.com/unitednations/international-greenhouse-gas-emissions
используя дерево решений и инструмент для работы с большими базами данных Spark (PySpark для python)

In [3]:
#import necessary libraries
#портируем необходимые библиотеки
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.sql import Row
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import *
from pyspark.ml import Pipeline
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.mllib.tree import DecisionTree, DecisionTreeModel
from pyspark.ml.feature import StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
import pyspark.sql.functions as sf
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.regression import GBTRegressor
#other types of regression models
#можно использовать и другие виды регрессии
#from pyspark.ml.regression import LinearRegression
#from pyspark.ml.regression import RandomForestRegressor
#from pyspark.ml.regression import GeneralizedLinearRegression
#from pyspark.ml.regression import DecisionTreeRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator

In [5]:
#create spark session
#создаём спарк сессею
sc = SparkContext('local')
spark = SparkSession(sc)
spark

In [6]:
#load .csv data from path_csv
#загружаем данные формата .csv из path_csv
path_csv = 'greenhouse_gas_inventory_data_data.csv'
data = spark.read.format("csv")\
        .option("header", "true")\
        .option("delimiter", ",")\
        .option("inferSchema", "true")\
        .load(path_csv)

In [17]:
#count of rows in the dataset
#количество строк данных
data.select('year').count()

8406

In [7]:
#show part of data
#посмотрим на часть данных
data.show()

+---------------+----+----------------+--------------------+
|country_or_area|year|           value|            category|
+---------------+----+----------------+--------------------+
|      Australia|2014|393126.946994288|carbon_dioxide_co...|
|      Australia|2013| 396913.93653029|carbon_dioxide_co...|
|      Australia|2012|  406462.8477036|carbon_dioxide_co...|
|      Australia|2011|403705.528313991|carbon_dioxide_co...|
|      Australia|2010|406200.993184341|carbon_dioxide_co...|
|      Australia|2009| 408448.47899963|carbon_dioxide_co...|
|      Australia|2008|404237.828214077|carbon_dioxide_co...|
|      Australia|2007|398816.453543549|carbon_dioxide_co...|
|      Australia|2006|391134.100909449|carbon_dioxide_co...|
|      Australia|2005|385581.132806466|carbon_dioxide_co...|
|      Australia|2004|381519.261592783|carbon_dioxide_co...|
|      Australia|2003|368345.977425107|carbon_dioxide_co...|
|      Australia|2002|361861.387896028|carbon_dioxide_co...|
|      Australia|2001|35

In [8]:
#show all varients of category column values
#посмотрим на варианты значений колонки category
data.select("category").distinct().show(10, False)

+--------------------------------------------------------------------------------------------------------------+
|category                                                                                                      |
+--------------------------------------------------------------------------------------------------------------+
|nitrogen_trifluoride_nf3_emissions_in_kilotonne_co2_equivalent                                                |
|unspecified_mix_of_hydrofluorocarbons_hfcs_and_perfluorocarbons_pfcs_emissions_in_kilotonne_co2_equivalent    |
|methane_ch4_emissions_without_land_use_land_use_change_and_forestry_lulucf_in_kilotonne_co2_equivalent        |
|nitrous_oxide_n2o_emissions_without_land_use_land_use_change_and_forestry_lulucf_in_kilotonne_co2_equivalent  |
|greenhouse_gas_ghgs_emissions_including_indirect_co2_without_lulucf_in_kilotonne_co2_equivalent               |
|greenhouse_gas_ghgs_emissions_without_land_use_land_use_change_and_forestry_lulucf_in_kilotonne

In [9]:
#show types of columns
#посмотрим на типы всех наших колонок
data.printSchema()

root
 |-- country_or_area: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- value: double (nullable = true)
 |-- category: string (nullable = true)



In [12]:
#value is dependent and predicted  - label
#value - это зависимая и предсказываемая переменная - метка
stages = []
label_stringIdx = StringIndexer(inputCol = 'value', outputCol = 'label', handleInvalid = 'keep')
stages += [label_stringIdx]

#depend on categorical columns: country and types of emission
#зависит от категориаьных колонок: страны и категории загрязнения
categoricalColumns = ['country_or_area', 'category']
for categoricalCol in categoricalColumns:
    #convert categorical columns to binary vectors through string conventer
    #преобразование категориальных колонок в бинарные вектора благодаря строковому преобразователю
    stringIndexer = StringIndexer(inputCol = categoricalCol,
                                  outputCol = categoricalCol + 'Index',
                                  handleInvalid = 'keep')
    encoder = OneHotEncoder(inputCol=stringIndexer.getOutputCol(),
                            outputCol=categoricalCol + "classVec")
    stages += [stringIndexer, encoder]

#depend on numeric column: year
#зависит от численной колонки: года
numericCols = ['year']
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
#convert multiple columns into a vector column - features
#преобразование нескольких колонок в вектор-колонку - признаки
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]

In [13]:
# Split the data into training and test sets (30% held out for testing)
#делим данные на обучающую и тестовую выборки (30% тестовая)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# еrain a GBT ( gradient boosting tree regression) model
#тренируем модель (градиентного регрессионого дерева бустинга)
gbt = GBTRegressor(labelCol="label", featuresCol="features", maxIter=10)
stages += [gbt]

# give a plan (stages) model training 
# задаем план stages для обучения модели 
pipeline = Pipeline(stages=stages)


In [14]:
# train model
# тренируем модель
model = pipeline.fit(trainingData)

# make predictions on test dataset
# делаем предсказания на тестовой выборке
predictions = model.transform(testData)


In [16]:
# Select (prediction, true label) and compute test error
# выбираем предсказанное и истинное значение и считаем ошибку
evaluator = RegressionEvaluator(
    labelCol="label", predictionCol="prediction", metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE) on test data = %g" % rmse)
gbtModel = model.stages[-1]
print(gbtModel)

Root Mean Squared Error (RMSE) on test data = 2801.75
GBTRegressionModel: uid=GBTRegressor_a6fe876dd0b5, numTrees=10, numFeatures=54


In [None]:
#gbt 2717.59
#lin_regr 2737.69
#rand_for_regr 2751.47
#dec_tree_regr 2709.51
#gen_lin_regr 2723.14

In [66]:
#save model
#сохраняем модель
pipeline.write().overwrite().save('model/gbtregr_model_1')

In [None]:
#load model
#загружаем модель для работы после обучения
load_model = pipeline.read().load('model/gbtregr_model_1') 

                                               Conclusion
                                                 Вывод

the error turned out to be very large, however, the goal to improve the methodology of working in PySpark on the example of pollution data in different countries in the period from 1990-2017 with several columns of different types was completed. such an error value may be associated with a small dataset (only 8.4 K) and the use of solving trees for regression problems

ошибка получилась очень большой, однако, цель разбаботать методику работы в PySpark 
на примере данных загрязнений в разных странах в перод времени с 1990-2017 
с несколькими колонками разных типов выполнена.
подобная величина ошибки может быть связана с маленьким датасетом (всего 8,4 к)
и применением решаюшщих деревьев для задач регрессии