In [1]:
import findspark
findspark.init('/home/mocatfrio/spark')

In [2]:
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.appName("fc").getOrCreate()

In [4]:
print(spark)

<pyspark.sql.session.SparkSession object at 0x7fa33a272da0>


# Load Dataset

In [5]:
import os
dataset_path = '/Users/mocatfrio/Documents/thesis/tugas-akhir/App/dataset-generator'
# dataset_path = '/home/mocatfrio/Documents/tugas-akhir/app/dataset-generator'

In [6]:
fc_file = os.path.join(dataset_path, 'covtype.csv')
fc_df = spark.read.load(fc_file, format="csv", sep=",", inferSchema="true", header="true")

In [7]:
# Print top 20 rows data
fc_df.show()

+---------+------+-----+--------------------------------+------------------------------+-------------------------------+-------------+--------------+-------------+----------------------------------+----------------+----------------+----------------+----------------+----------+----------+----------+----------+----------+----------+----------+----------+----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+-----------+----------+
|Elevation|Aspect|Slope|Horizontal_Distance_To_Hydrology|Vertical_Distance_To_Hydrology|Horizontal_Distance_To_Roadways|Hillshade_9am|Hillshade_Noon|Hillshade_3pm|Horizontal_Distance_To_Fire_Points|Wilderness_Area1|Wilderness_Area2|Wilderness_Area3|Wi

In [8]:
fc_df.count()

581012

In [9]:
fc_df.printSchema()

root
 |-- Elevation: integer (nullable = true)
 |-- Aspect: integer (nullable = true)
 |-- Slope: integer (nullable = true)
 |-- Horizontal_Distance_To_Hydrology: integer (nullable = true)
 |-- Vertical_Distance_To_Hydrology: integer (nullable = true)
 |-- Horizontal_Distance_To_Roadways: integer (nullable = true)
 |-- Hillshade_9am: integer (nullable = true)
 |-- Hillshade_Noon: integer (nullable = true)
 |-- Hillshade_3pm: integer (nullable = true)
 |-- Horizontal_Distance_To_Fire_Points: integer (nullable = true)
 |-- Wilderness_Area1: integer (nullable = true)
 |-- Wilderness_Area2: integer (nullable = true)
 |-- Wilderness_Area3: integer (nullable = true)
 |-- Wilderness_Area4: integer (nullable = true)
 |-- Soil_Type1: integer (nullable = true)
 |-- Soil_Type2: integer (nullable = true)
 |-- Soil_Type3: integer (nullable = true)
 |-- Soil_Type4: integer (nullable = true)
 |-- Soil_Type5: integer (nullable = true)
 |-- Soil_Type6: integer (nullable = true)
 |-- Soil_Type7: integer

In [10]:
fc_df.createOrReplaceTempView("covtype")

# Preprocess
## 3 Dimensi

In [11]:
# Retrieve the data needed:
# - Slope as ts in
# - Aspect as ts out
# - Other as dim

data = spark.sql("SELECT ROW_NUMBER() OVER (ORDER BY NULL) AS `id`, ROW_NUMBER() OVER (ORDER BY NULL) AS `Label`, \
                 `Slope`, `Aspect`, `Elevation`, `Horizontal_Distance_To_Hydrology`, `Horizontal_Distance_To_Roadways`, \
                 `Hillshade_9am`, `Hillshade_Noon`, `Hillshade_3pm`, `Horizontal_Distance_To_Fire_Points`\
                  FROM covtype \
                  WHERE `Slope` < `Aspect`")


In [12]:
data.show()

+---+-----+-----+------+---------+--------------------------------+-------------------------------+-------------+--------------+-------------+----------------------------------+
| id|Label|Slope|Aspect|Elevation|Horizontal_Distance_To_Hydrology|Horizontal_Distance_To_Roadways|Hillshade_9am|Hillshade_Noon|Hillshade_3pm|Horizontal_Distance_To_Fire_Points|
+---+-----+-----+------+---------+--------------------------------+-------------------------------+-------------+--------------+-------------+----------------------------------+
|  1|    1|    3|    51|     2596|                             258|                            510|          221|           232|          148|                              6279|
|  2|    2|    2|    56|     2590|                             212|                            390|          220|           235|          151|                              6225|
|  3|    3|    9|   139|     2804|                             268|                           3180|          2

In [13]:
data.count()

546298

In [14]:
from pyspark.sql.functions import udf
from pyspark.sql.types import *

def labelprod(id):
    label = 'covtype' + str(id)
    return label
        
def labelcust(id):
    label = 'fctype' + str(id)
    return label

# NOTE: it seems that calls to udf() must be after SparkContext() is called
udfLabel = udf(labelprod, StringType())
df_prep = data.withColumn("Label", udfLabel("id"))
df_prep.show()

+---+---------+-----+------+---------+--------------------------------+-------------------------------+-------------+--------------+-------------+----------------------------------+
| id|    Label|Slope|Aspect|Elevation|Horizontal_Distance_To_Hydrology|Horizontal_Distance_To_Roadways|Hillshade_9am|Hillshade_Noon|Hillshade_3pm|Horizontal_Distance_To_Fire_Points|
+---+---------+-----+------+---------+--------------------------------+-------------------------------+-------------+--------------+-------------+----------------------------------+
|  1| covtype1|    3|    51|     2596|                             258|                            510|          221|           232|          148|                              6279|
|  2| covtype2|    2|    56|     2590|                             212|                            390|          220|           235|          151|                              6225|
|  3| covtype3|    9|   139|     2804|                             268|                   

In [15]:
df_prep.createOrReplaceTempView("prep")

In [16]:
# split

df_prod = spark.sql("SELECT * FROM prep WHERE `id` <= 546298/2")
df_cust = spark.sql("SELECT ROW_NUMBER() OVER (ORDER BY NULL) AS `id`, ROW_NUMBER() OVER (ORDER BY NULL) AS `Label`, \
                    `Slope`, `Aspect`, `Elevation`, `Horizontal_Distance_To_Hydrology`, `Horizontal_Distance_To_Roadways`, \
                    `Hillshade_9am`, `Hillshade_Noon`, `Hillshade_3pm`, `Horizontal_Distance_To_Fire_Points`\
                    FROM prep WHERE `id` > 546298/2")

udfLabel = udf(labelcust, StringType())
df_cust = df_cust.withColumn("Label", udfLabel("id"))

In [17]:
df_cust.show()

+---+--------+-----+------+---------+--------------------------------+-------------------------------+-------------+--------------+-------------+----------------------------------+
| id|   Label|Slope|Aspect|Elevation|Horizontal_Distance_To_Hydrology|Horizontal_Distance_To_Roadways|Hillshade_9am|Hillshade_Noon|Hillshade_3pm|Horizontal_Distance_To_Fire_Points|
+---+--------+-----+------+---------+--------------------------------+-------------------------------+-------------+--------------+-------------+----------------------------------+
|  1| fctype1|    9|   120|     2944|                             108|                           1236|          236|           232|          127|                               560|
|  2| fctype2|    4|   122|     2942|                              85|                           1266|          228|           236|          143|                               532|
|  3| fctype3|    2|   119|     2940|                              67|                         

In [18]:
df_prod.createOrReplaceTempView("product")
df_cust.createOrReplaceTempView("customer")

In [19]:
df_prod.count()

273149

In [20]:
df_cust.count()

273149

## Skenario

- 10000 3
- 20000 3
- 50000 3
- 100000 3
- 200000 3

- 10000 4
- 10000 5
- 10000 6
- 10000 7

In [21]:
import pandas

export_path = '/Users/mocatfrio/Documents/thesis/tugas-akhir/App/deploy/dataset/fc'
# export_path = '/home/mocatfrio/Documents/tugas-akhir/app/deploy/dataset/fc'

In [22]:
df_p = spark.sql("SELECT `id`, `Label`, `Slope`, `Aspect`, `Elevation`, `Horizontal_Distance_To_Hydrology`, \
                        `Horizontal_Distance_To_Roadways` \
                        FROM product \
                        WHERE `id` <= 500")
df_c = spark.sql("SELECT `id`, `Label`, `Slope`, `Aspect`, `Elevation`, `Horizontal_Distance_To_Hydrology`, \
                        `Horizontal_Distance_To_Roadways` \
                        FROM customer \
                        WHERE `id` <= 500")
df_p.toPandas().to_csv(os.path.join(export_path, 'product_fc_500_3.csv'))
df_c.toPandas().to_csv(os.path.join(export_path, 'customer_fc_500_3.csv'))

In [23]:
df_p = spark.sql("SELECT `id`, `Label`, `Slope`, `Aspect`, `Elevation`, `Horizontal_Distance_To_Hydrology`, \
                        `Horizontal_Distance_To_Roadways` \
                        FROM product \
                        WHERE `id` <= 1000")
df_c = spark.sql("SELECT `id`, `Label`, `Slope`, `Aspect`, `Elevation`, `Horizontal_Distance_To_Hydrology`, \
                        `Horizontal_Distance_To_Roadways` \
                        FROM customer \
                        WHERE `id` <= 1000")
df_p.toPandas().to_csv(os.path.join(export_path, 'product_fc_1000_3.csv'))
df_c.toPandas().to_csv(os.path.join(export_path, 'customer_fc_1000_3.csv'))

In [24]:
df_p = spark.sql("SELECT `id`, `Label`, `Slope`, `Aspect`, `Elevation`, `Horizontal_Distance_To_Hydrology`, \
                        `Horizontal_Distance_To_Roadways` \
                        FROM product \
                        WHERE `id` <= 2000")
df_c = spark.sql("SELECT `id`, `Label`, `Slope`, `Aspect`, `Elevation`, `Horizontal_Distance_To_Hydrology`, \
                        `Horizontal_Distance_To_Roadways` \
                        FROM customer \
                        WHERE `id` <= 2000")
df_p.toPandas().to_csv(os.path.join(export_path, 'product_fc_2000_3.csv'))
df_c.toPandas().to_csv(os.path.join(export_path, 'customer_fc_2000_3.csv'))

In [25]:
df_p = spark.sql("SELECT `id`, `Label`, `Slope`, `Aspect`, `Elevation`, `Horizontal_Distance_To_Hydrology`, \
                        `Horizontal_Distance_To_Roadways` \
                        FROM product \
                        WHERE `id` <= 5000")
df_c = spark.sql("SELECT `id`, `Label`, `Slope`, `Aspect`, `Elevation`, `Horizontal_Distance_To_Hydrology`, \
                        `Horizontal_Distance_To_Roadways` \
                        FROM customer \
                        WHERE `id` <= 5000")
df_p.toPandas().to_csv(os.path.join(export_path, 'product_fc_5000_3.csv'))
df_c.toPandas().to_csv(os.path.join(export_path, 'customer_fc_5000_3.csv'))

In [26]:
df_p = spark.sql("SELECT `id`, `Label`, `Slope`, `Aspect`, `Elevation`, `Horizontal_Distance_To_Hydrology`, \
                        `Horizontal_Distance_To_Roadways` \
                        FROM product \
                        WHERE `id` <= 10000")
df_c = spark.sql("SELECT `id`, `Label`, `Slope`, `Aspect`, `Elevation`, `Horizontal_Distance_To_Hydrology`, \
                        `Horizontal_Distance_To_Roadways` \
                        FROM customer \
                        WHERE `id` <= 10000")
df_p.toPandas().to_csv(os.path.join(export_path, 'product_fc_10000_3.csv'))
df_c.toPandas().to_csv(os.path.join(export_path, 'customer_fc_10000_3.csv'))

In [27]:
df_p = spark.sql("SELECT `id`, `Label`, `Slope`, `Aspect`, `Elevation`, `Horizontal_Distance_To_Hydrology`, \
                        `Horizontal_Distance_To_Roadways`, `Hillshade_9am` \
                        FROM product \
                        WHERE `id` <= 2000")
df_c = spark.sql("SELECT `id`, `Label`, `Slope`, `Aspect`, `Elevation`, `Horizontal_Distance_To_Hydrology`, \
                        `Horizontal_Distance_To_Roadways`, `Hillshade_9am` \
                        FROM customer \
                        WHERE `id` <= 2000")
df_p.toPandas().to_csv(os.path.join(export_path, 'product_fc_2000_4.csv'))
df_c.toPandas().to_csv(os.path.join(export_path, 'customer_fc_2000_4.csv'))

In [28]:
df_p = spark.sql("SELECT `id`, `Label`, `Slope`, `Aspect`, `Elevation`, `Horizontal_Distance_To_Hydrology`, \
                        `Horizontal_Distance_To_Roadways`, `Hillshade_9am`, `Hillshade_Noon` \
                        FROM product \
                        WHERE `id` <= 2000")
df_c = spark.sql("SELECT `id`, `Label`, `Slope`, `Aspect`, `Elevation`, `Horizontal_Distance_To_Hydrology`, \
                        `Horizontal_Distance_To_Roadways`, `Hillshade_9am`, `Hillshade_Noon` \
                        FROM customer \
                        WHERE `id` <= 2000")
df_p.toPandas().to_csv(os.path.join(export_path, 'product_fc_2000_5.csv'))
df_c.toPandas().to_csv(os.path.join(export_path, 'customer_fc_2000_5.csv'))

In [29]:
df_p = spark.sql("SELECT `id`, `Label`, `Slope`, `Aspect`, `Elevation`, `Horizontal_Distance_To_Hydrology`, \
                        `Horizontal_Distance_To_Roadways`, `Hillshade_9am`, `Hillshade_Noon`, `Hillshade_3pm` \
                        FROM product \
                        WHERE `id` <= 2000")
df_c = spark.sql("SELECT `id`, `Label`, `Slope`, `Aspect`, `Elevation`, `Horizontal_Distance_To_Hydrology`, \
                        `Horizontal_Distance_To_Roadways`, `Hillshade_9am`, `Hillshade_Noon`, `Hillshade_3pm` \
                        FROM customer \
                        WHERE `id` <= 2000")
df_p.toPandas().to_csv(os.path.join(export_path, 'product_fc_2000_6.csv'))
df_c.toPandas().to_csv(os.path.join(export_path, 'customer_fc_2000_6.csv'))

In [30]:
df_p = spark.sql("SELECT * \
                       FROM product \
                        WHERE `id` <= 2000")
df_c = spark.sql("SELECT * \
                        FROM customer \
                        WHERE `id` <= 2000")
df_p.toPandas().to_csv(os.path.join(export_path, 'product_fc_2000_7.csv'))
df_c.toPandas().to_csv(os.path.join(export_path, 'customer_fc_2000_7.csv'))