#### Spark DataFrame API 
Метод работы со spark, похожий на привычный нам pandas. За исключением того, что все происходит на кластере

Для работы с api нам понадобится переменная **spark** которая содержит метаинформацию о подключении к spark. Она чем-то похожа на знакомую нам переменную **sc**

In [None]:
spark

### Чтение данных из csv

In [None]:
df = spark.read.csv("crimes.csv", header=True)

In [None]:
df.columns

In [None]:
df.head(5)

In [None]:
df[["Latitude", "Longitude"]].show(5)

In [None]:
df_short = df[['Arrest', 'Primary Type', 'Longitude', 'Latitude']]

In [None]:
df_short.show(4)

#### Преобразуем строковые значения в числа

In [None]:
df_short = df_short.withColumn('LongitudeFloat', df_short["Longitude"].cast("float"))
df_short = df_short.withColumn('LatitudeFloat', df_short["Latitude"].cast("float"))
df_short = df_short.withColumn('ArrestInt', df_short["Arrest"].cast("boolean").cast("int"))
df_short.head()

In [None]:
features = ['ArrestInt', 'Primary Type', 'LongitudeFloat', 'LatitudeFloat']
df_short = df_short[features]
df_short = df_short.withColumnRenamed('LongitudeFloat', 'Longitude')\
                   .withColumnRenamed('LatitudeFloat', 'Latitude')\
                   .withColumnRenamed('ArrestInt', 'Arrest')  

In [None]:
df_short.show(5)

##### Есть много функций для агрегации данных, например mean

In [None]:
from pyspark.sql.functions import mean

df_short.select(mean(df_short["Latitude"])).collect()

#### Можно зарегестрировать таблицу как SQL и работать с ней: 

In [None]:
df_short.createOrReplaceTempView("crimes_table")

In [None]:
new_df = spark.sql("select * from crimes_table where Latitude > 41.89139 and Latitude < 41.89140")

In [16]:
new_df.show()

+------+-------------------+----------+---------+
|Arrest|       Primary Type| Longitude| Latitude|
+------+-------------------+----------+---------+
|     0|            BATTERY|-87.744385|  41.8914|
|     0|      OTHER OFFENSE|-87.744316|  41.8914|
|     0|           BURGLARY| -87.74448|  41.8914|
|     0|           BURGLARY|-87.669586|41.891396|
|     0|              THEFT| -87.74429|  41.8914|
|     0|    CRIMINAL DAMAGE| -87.74448|  41.8914|
|     0|            BATTERY| -87.74448|  41.8914|
|     0|    CRIMINAL DAMAGE| -87.74443|  41.8914|
|     0|    CRIMINAL DAMAGE| -87.74443|  41.8914|
|     0|    CRIMINAL DAMAGE| -87.74448|  41.8914|
|     0|              THEFT| -87.66228| 41.89139|
|     0|    CRIMINAL DAMAGE| -87.74443|  41.8914|
|     0|           BURGLARY| -87.74448|  41.8914|
|     0|            BATTERY| -87.74429|  41.8914|
|     0|MOTOR VEHICLE THEFT| -87.74425|  41.8914|
|     0|    CRIMINAL DAMAGE| -87.74443|  41.8914|
|     0|    CRIMINAL DAMAGE| -87.67123|  41.8914|


### SparkML

### Подготовка фичей

In [17]:
from pyspark.ml.feature import OneHotEncoderEstimator, StringIndexer

In [18]:
type_indexer = StringIndexer(inputCol="Primary Type", outputCol="type")

In [19]:
df_short = type_indexer.fit(df_short).transform(df_short)

In [20]:
df_short.show(4)

+------+-------------+----------+---------+----+
|Arrest| Primary Type| Longitude| Latitude|type|
+------+-------------+----------+---------+----+
|     0|      BATTERY|-87.744385|  41.8914| 1.0|
|     1|OTHER OFFENSE| -87.66532|41.773373| 5.0|
|     0|      BATTERY| -87.59664| 41.81386| 1.0|
|     0|      BATTERY| -87.62262|41.800804| 1.0|
+------+-------------+----------+---------+----+
only showing top 4 rows



In [21]:
df_short = df_short["Arrest", "Longitude", "Latitude", "type"]

In [22]:
df_short.show(4)

+------+----------+---------+----+
|Arrest| Longitude| Latitude|type|
+------+----------+---------+----+
|     0|-87.744385|  41.8914| 1.0|
|     1| -87.66532|41.773373| 5.0|
|     0| -87.59664| 41.81386| 1.0|
|     0| -87.62262|41.800804| 1.0|
+------+----------+---------+----+
only showing top 4 rows



In [23]:
encoder = OneHotEncoderEstimator(inputCols=['type'], outputCols=['type_vec'])

In [24]:
model = encoder.fit(df_short)

In [25]:
df_short = model.transform(df_short)

In [26]:
df_short.show(5)

+------+----------+---------+----+--------------+
|Arrest| Longitude| Latitude|type|      type_vec|
+------+----------+---------+----+--------------+
|     0|-87.744385|  41.8914| 1.0|(33,[1],[1.0])|
|     1| -87.66532|41.773373| 5.0|(33,[5],[1.0])|
|     0| -87.59664| 41.81386| 1.0|(33,[1],[1.0])|
|     0| -87.62262|41.800804| 1.0|(33,[1],[1.0])|
|     0|-87.743355|41.878063| 9.0|(33,[9],[1.0])|
+------+----------+---------+----+--------------+
only showing top 5 rows



In [27]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=['Longitude', 'Latitude', 'type_vec'], outputCol="features", handleInvalid='skip')

df_short = assembler.transform(df_short)

In [28]:
df_short.show(5)

+------+----------+---------+----+--------------+--------------------+
|Arrest| Longitude| Latitude|type|      type_vec|            features|
+------+----------+---------+----+--------------+--------------------+
|     0|-87.744385|  41.8914| 1.0|(33,[1],[1.0])|(35,[0,1,3],[-87....|
|     1| -87.66532|41.773373| 5.0|(33,[5],[1.0])|(35,[0,1,7],[-87....|
|     0| -87.59664| 41.81386| 1.0|(33,[1],[1.0])|(35,[0,1,3],[-87....|
|     0| -87.62262|41.800804| 1.0|(33,[1],[1.0])|(35,[0,1,3],[-87....|
|     0|-87.743355|41.878063| 9.0|(33,[9],[1.0])|(35,[0,1,11],[-87...|
+------+----------+---------+----+--------------+--------------------+
only showing top 5 rows



In [29]:
df_short = df_short[["Arrest", "features"]]
df_short.show(4)

+------+--------------------+
|Arrest|            features|
+------+--------------------+
|     0|(35,[0,1,3],[-87....|
|     1|(35,[0,1,7],[-87....|
|     0|(35,[0,1,3],[-87....|
|     0|(35,[0,1,3],[-87....|
+------+--------------------+
only showing top 4 rows



### ML на больших данных

In [30]:
(training_data, test_data) = df_short.randomSplit([0.7, 0.3])


In [31]:
training_data.show(5)

+------+--------------------+
|Arrest|            features|
+------+--------------------+
|     0|(35,[0,1,2],[-87....|
|     0|(35,[0,1,2],[-87....|
|     0|(35,[0,1,2],[-87....|
|     0|(35,[0,1,2],[-87....|
|     0|(35,[0,1,2],[-87....|
+------+--------------------+
only showing top 5 rows



In [32]:
from pyspark.ml.classification import RandomForestClassifier

In [33]:
rf = RandomForestClassifier(labelCol="Arrest", featuresCol="features", numTrees=10)
model = rf.fit(training_data)

In [34]:
prediction = model.transform(training_data)

In [35]:
prediction.show(5)

+------+--------------------+--------------------+--------------------+----------+
|Arrest|            features|       rawPrediction|         probability|prediction|
+------+--------------------+--------------------+--------------------+----------+
|     0|(35,[0,1,2],[-87....|[8.29124610884311...|[0.82912461088431...|       0.0|
|     0|(35,[0,1,2],[-87....|[8.29124610884311...|[0.82912461088431...|       0.0|
|     0|(35,[0,1,2],[-87....|[8.29124610884311...|[0.82912461088431...|       0.0|
|     0|(35,[0,1,2],[-87....|[8.29124610884311...|[0.82912461088431...|       0.0|
|     0|(35,[0,1,2],[-87....|[8.29124610884311...|[0.82912461088431...|       0.0|
+------+--------------------+--------------------+--------------------+----------+
only showing top 5 rows



In [36]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator()
evaluator.setLabelCol("Arrest")
evaluator.setMetricName("areaUnderROC")
evaluator.evaluate(prediction)

0.7959037153469779