# OCI Data Science - Useful Tips
<details>
<summary><font size="2">Check for Public Internet Access</font></summary>

```python
import requests
response = requests.get("https://oracle.com")
assert response.status_code==200, "Internet connection failed"
```
</details>
<details>
<summary><font size="2">Helpful Documentation </font></summary>
<ul><li><a href="https://docs.cloud.oracle.com/en-us/iaas/data-science/using/data-science.htm">Data Science Service Documentation</a></li>
<li><a href="https://docs.cloud.oracle.com/iaas/tools/ads-sdk/latest/index.html">ADS documentation</a></li>
</ul>
</details>
<details>
<summary><font size="2">Typical Cell Imports and Settings for ADS</font></summary>

```python
%load_ext autoreload
%autoreload 2
%matplotlib inline

import warnings
warnings.filterwarnings('ignore')

import logging
logging.basicConfig(format='%(levelname)s:%(message)s', level=logging.ERROR)

import ads
from ads.dataset.factory import DatasetFactory
from ads.automl.provider import OracleAutoMLProvider
from ads.automl.driver import AutoML
from ads.evaluations.evaluator import ADSEvaluator
from ads.common.data import ADSData
from ads.explanations.explainer import ADSExplainer
from ads.explanations.mlx_global_explainer import MLXGlobalExplainer
from ads.explanations.mlx_local_explainer import MLXLocalExplainer
from ads.catalog.model import ModelCatalog
from ads.common.model_artifact import ModelArtifact
```
</details>
<details>
<summary><font size="2">Useful Environment Variables</font></summary>

```python
import os
print(os.environ["NB_SESSION_COMPARTMENT_OCID"])
print(os.environ["PROJECT_OCID"])
print(os.environ["USER_OCID"])
print(os.environ["TENANCY_OCID"])
print(os.environ["NB_REGION"])
```
</details>

### 1. Set up spark context and SparkSession

In [1]:
from pyspark.sql import SparkSession
import time

# VALIDATION_FILE_PATH: Validation data path that needs to be predicted

### Running whole code from beginning to end may last 18 minute

In [2]:
# Validation data path that contains data to predict
VALIDATION_DATA_PATH = "TICKET_VALIDATION_DATA.csv"

In [3]:
# final path that will contain predicted values.
VALIDATION_DATA_OUTPUT_PATH = 'FRP_hamsi.csv'

In [4]:
path = "TICKET.csv"

TRAINING_DATA_PATH = "TICKET_TRAIN.csv"

In [5]:

###
import numpy as np
import pandas as pd
from itertools import chain
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql import functions as F
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
###



In [6]:
spark = SparkSession \
        .builder \
        .appName("Ucuş tipi tahminleme multinomial logical regression") \
        .getOrCreate()

#.config("spark.some.config.option", "some.value") \

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/home/datascience/spark_conf_dir/oci-hdfs-full-3.3.0.6.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/datascience/conda/pyspark30_p37_cpu_v2/lib/python3.7/site-packages/pyspark/jars/slf4j-log4j12-1.7.30.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]


### 2. Load dataset

In [7]:
# datas to train the model
df_train = spark.read.format('com.databricks.spark.csv') \
            .options(header='true', inferschema='true') \
            .load(TRAINING_DATA_PATH, header=True);

                                                                                

In [8]:
# reading validation data
df_validation = spark.read.format('com.databricks.spark.csv') \
            .options(header='true', inferschema='true') \
            .load(VALIDATION_DATA_PATH, header=True);

                                                                                

In [9]:
df = df_train.union(df_validation)

#df.show(5)

#producing the dataFrame

In [10]:
from pyspark.sql.functions import col, explode, array, lit

In [11]:
"""df.filter(col("FLIGHT_REASON") != "NONE").repartition(1).write.format("com.databricks.spark.csv").option("header", "true").save('TICKET_TRAIN.csv')
df.filter(col("FLIGHT_REASON") == "NONE").repartition(1).write.format("com.databricks.spark.csv").option("header", "true").save('TICKET_VALIDATION.csv')"""


'df.filter(col("FLIGHT_REASON") != "NONE").repartition(1).write.format("com.databricks.spark.csv").option("header", "true").save(\'TICKET_TRAIN.csv\')\ndf.filter(col("FLIGHT_REASON") == "NONE").repartition(1).write.format("com.databricks.spark.csv").option("header", "true").save(\'TICKET_VALIDATION.csv\')'

In [12]:
# Extracting datas on OND column
def ond_to_dep(x):
    return str(str(x).split("-")[0])

def ond_to_arr(x):
    return str(str(x).split("-")[1])

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, DoubleType, IntegerType

ond_to_dep_udf = udf(lambda x: ond_to_dep(x), StringType())
df = df.withColumn("DEP_CITY", ond_to_dep_udf("OND"))

ond_to_arr_udf = udf(lambda x: ond_to_arr(x), StringType())
df = df.withColumn("ARR_CITY", ond_to_arr_udf("OND"))

In [13]:
from pyspark.sql.functions import mean as _mean, stddev as _stddev, col

df_stats = df.select(
    _mean(col('CUST_AGE')).alias('mean')
).collect()

year_mean = df_stats[0]['mean']
#year_std = df_stats[0]['std']

df = df.fillna(year_mean, subset=["CUST_AGE"])

# empty cells on CUST_AGE are fillling with average value of CUST_AGE column



                                                                                

In [14]:
df = df.drop("NTNLT1") # Removing NTNLT1 column since most of it's cells are N/A

df = df.na.drop()

In [15]:
df.groupBy("FLIGHT_REASON").count().toPandas()

                                                                                

Unnamed: 0,FLIGHT_REASON,count
0,LEISURE,576739
1,SECOND HOME,259011
2,STUDENT,1228
3,BUSINESS,458593
4,NONE,8911797


In [16]:
# copying STUDENT flight type rows on FLIGHT_REASON column because there's not many STUDENT flight type when compared with other ones

from pyspark.sql.functions import col, explode, array, lit

major_df = df.filter(col("FLIGHT_REASON") != "STUDENT")
minor_df = df.filter(col("FLIGHT_REASON") == "STUDENT")
ratio = int(major_df.count()/minor_df.count()/3)
print("ratio: {}".format(ratio))



ratio: 2770


                                                                                

In [17]:
start = time.time()

a = range(ratio)
# duplicate the minority rows
oversampled_df = minor_df.withColumn("dummy", explode(array([lit(x) for x in a]))).drop('dummy')

# combine both oversampled minority rows and previous majority rows 
combined_df = major_df.unionAll(oversampled_df)
df = combined_df

end = time.time()
print(end - start)

6.039738893508911


In [18]:
df.groupBy("FLIGHT_REASON").count().toPandas()

                                                                                

Unnamed: 0,FLIGHT_REASON,count
0,LEISURE,576739
1,SECOND HOME,259011
2,STUDENT,3401560
3,BUSINESS,458593
4,NONE,8911797


In [19]:
# Convert to float format
def string_to_float(x):
    return float(x)

def date_to_year(x):
    return int(str(x)[0:4])

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, DoubleType, IntegerType


date_to_year_udf = udf(lambda x: date_to_year(x), IntegerType())
df = df.withColumn("ID_PNR_CREATION_YEAR", date_to_year_udf("ID_PNR_CREATION_YMD"))

# producing ID_PNR_CREATION_YEAR column with using ID_PNR_CREATION_YMD.

In [20]:
# a function which converts the columns hasn't digital values to a shape that StringIndexer and VectorIndexer can understand
def get_dummy(df,categoricalCols,continuousCols,labelCol):
    from pyspark.ml import Pipeline
    from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
    from pyspark.sql.functions import col
    indexers = [ StringIndexer(inputCol=c, outputCol="{0}_indexed".format(c)) for c in categoricalCols ]
    # default setting: dropLast=True
    encoders = [ OneHotEncoder(inputCol=indexer.getOutputCol(),outputCol="{0}_encoded".format(indexer.getOutputCol())) for indexer in indexers ]
    assembler = VectorAssembler(inputCols=[encoder.getOutputCol() for encoder in encoders] + continuousCols, outputCol="features")
    pipeline = Pipeline(stages=indexers + encoders + [assembler])
    model=pipeline.fit(df)
    data = model.transform(df)
    data = data.withColumn('label',col(labelCol))
    return data.select('features','label')

In [21]:
categoricalCols = ["DEP_CITY","ARR_CITY"]
continuousCols = ['PNR_PSSG_COUNT','CHILD_FLG','INFANT_FLG','POS_POC_SAME_FLG','SAME_SRNAME_FLG','FAMILY_FLG','SEAT_SELECT_FLG','XBAG_FIRST_FLT_FLG','XBAG_LAST_FLT_FLG','XBAG_TWO_WAY_FLT_FLG','SPORT_FLG','XBAG_FLG','PET_FLG','CUST_AGE','ID_PNR_CREATION_YEAR']
labelCol = "FLIGHT_REASON"

# columns that are useful for machine learning

df_features =  get_dummy(df,categoricalCols,continuousCols,labelCol)

                                                                                

### 4. Transform the dataset to DataFrame

In [22]:
from pyspark.ml.linalg import Vectors # !!!!caution: NOT from pyspark.mllib.linalg import Vectors
from pyspark.ml import Pipeline
from pyspark.ml.feature import IndexToString,StringIndexer, VectorIndexer
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

def transData(data):
    return data.rdd.map(lambda r: [Vectors.dense(r[:-1]),r[-1]]).toDF(['features','label'])

In [23]:
# Index labels, adding metadata to the label column

labelIndexer = StringIndexer(inputCol='label', outputCol='indexedLabel').fit(df_features)

labelIndexer.transform(df_features).show(5, True)

                                                                                

+--------------------+--------+------------+
|            features|   label|indexedLabel|
+--------------------+--------+------------+
|(151,[16,68,136,1...|BUSINESS|         3.0|
|(151,[0,73,136,13...|BUSINESS|         3.0|
|(151,[0,82,136,14...|BUSINESS|         3.0|
|(151,[1,68,136,14...|BUSINESS|         3.0|
|(151,[0,112,136,1...|BUSINESS|         3.0|
+--------------------+--------+------------+
only showing top 5 rows



In [24]:
# Automatically identify categorical features, and index them.

featureIndexer =VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(df_features)
featureIndexer.transform(df_features).show(5, True)

                                                                                

+--------------------+--------+--------------------+
|            features|   label|     indexedFeatures|
+--------------------+--------+--------------------+
|(151,[16,68,136,1...|BUSINESS|(151,[16,68,136,1...|
|(151,[0,73,136,13...|BUSINESS|(151,[0,73,136,13...|
|(151,[0,82,136,14...|BUSINESS|(151,[0,82,136,14...|
|(151,[1,68,136,14...|BUSINESS|(151,[1,68,136,14...|
|(151,[0,112,136,1...|BUSINESS|(151,[0,112,136,1...|
+--------------------+--------+--------------------+
only showing top 5 rows



Traceback (most recent call last):
  File "/home/datascience/conda/pyspark30_p37_cpu_v2/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/daemon.py", line 186, in manager
  File "/home/datascience/conda/pyspark30_p37_cpu_v2/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/daemon.py", line 74, in worker
  File "/home/datascience/conda/pyspark30_p37_cpu_v2/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 642, in main
    if read_int(infile) == SpecialLengths.END_OF_STREAM:
  File "/home/datascience/conda/pyspark30_p37_cpu_v2/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 595, in read_int
    raise EOFError
EOFError


In [25]:
from pyspark.sql.functions import monotonically_increasing_id

In [26]:
df_features = df_features.withColumn("id", monotonically_increasing_id())

df = df.withColumn("id", monotonically_increasing_id())

df_features_cols = df_features.columns
df_features_cols.remove('id')

df_cols = df.columns
df_cols.remove('id')

df_merged = df.join(df_features, df["id"] == df_features["id"])

Traceback (most recent call last):
  File "/home/datascience/conda/pyspark30_p37_cpu_v2/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/daemon.py", line 186, in manager
  File "/home/datascience/conda/pyspark30_p37_cpu_v2/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/daemon.py", line 74, in worker
  File "/home/datascience/conda/pyspark30_p37_cpu_v2/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 642, in main
    if read_int(infile) == SpecialLengths.END_OF_STREAM:
  File "/home/datascience/conda/pyspark30_p37_cpu_v2/lib/python3.7/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 595, in read_int
    raise EOFError
EOFError


### 5. Split the data to training and test data sets

In [27]:
# data is splitting to train and validation

(df_merged_train, df_merged_test) = df_merged.filter(df_merged["FLIGHT_REASON"] != "NONE"), df_merged.filter(df_merged["FLIGHT_REASON"] == "NONE")


 
(trainingData, testData) = df_merged.filter(df_merged["FLIGHT_REASON"] != "NONE").select(df_features_cols), df_merged.filter(df_merged["FLIGHT_REASON"] == "NONE").select(df_features_cols)
(trainingDF, testDF) = df_merged.filter(df_merged["FLIGHT_REASON"] != "NONE").select(df_cols), df_merged.filter(df_merged["FLIGHT_REASON"] == "NONE").select(df_cols)


### 6. Fit Multinomial logisticRegression Classification Model

In [28]:
# applying logistic regression since the value we need to predict has only four possible result.

from pyspark.ml.classification import LogisticRegression
logr = LogisticRegression(featuresCol='indexedFeatures', labelCol = 'indexedLabel')

### 

In [29]:
# Convert indexed labels back to original labels.
labelConverter = IndexToString(inputCol="prediction", outputCol = "predictedLabel",labels=labelIndexer.labels)

In [30]:
# Chain indexers and tree in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, logr, labelConverter])

In [31]:
# Training the model with train data.
model = pipeline.fit(trainingData)

                                                                                

### 8. Make predictions

In [32]:
# predicting
predictions = model.transform(testData)

### 9. Evaluation

#### Şimdi elimizde validasyon verisinin her bir satırı için bir tahmin var. Ancak bu veri 
#### ile validasyon verisi ayrıldığı için csv dosyasına yazdırmak mümkün olmadığından dolayı 
#### iki DataFrame'i birleştirmemiz gerek. Böylece csv dosyasına yazdırma işlemi tamamlanabilir.

In [33]:
testDF = testDF.withColumn("id2", monotonically_increasing_id())

predictions = predictions.withColumn("id2", monotonically_increasing_id())

In [34]:
finalTest = testDF.select(['PNR_NO', 'ID_TKT_NO','ID_PNR_CREATION_YMD','id2']).join(predictions.select('predictedLabel','id2'), testDF["id2"] == predictions["id2"])

finalTest = finalTest.withColumnRenamed('predictedLabel','FLIGHT_REASON')

finalTest = finalTest.drop("id2")
finalTest = finalTest.sort(finalTest.ID_TKT_NO.asc())

In [35]:
finalTest.repartition(1).write.format("com.databricks.spark.csv").option("header", "true").save(VALIDATION_DATA_OUTPUT_PATH)

                                                                                