<a href="https://colab.research.google.com/github/park-geun-hyeong/Data_Mining/blob/main/PySpark.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [28]:
path =  '/content/drive/MyDrive/Colab Notebooks/'

In [2]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

### Spark 3.2 Installation

In [3]:
!wget -q https://archive.apache.org/dist/spark/spark-3.2.0/spark-3.2.0-bin-hadoop3.2.tgz
!tar xf spark-3.2.0-bin-hadoop3.2.tgz

Set the environment path which enables you to run Pyspark in the Colab environment

In [4]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.2.0-bin-hadoop3.2"

Findspark: Find the locatation of the spark in the system

In [5]:
!pip install -q findspark

import findspark
findspark.init()

The entry point into all functionality in Spark is the SparkSession class. To create a basic SparkSession, just use SparkSession.builder():

Entry points
- Newer entry point: SparkSession (Introduced at Spark 2.0)
- Older entry points: SparkContext, SQLContext, and HiveContext 

SparkSession 
- Replace SQLContext and HiveContext

In [6]:
from pyspark.sql import SparkSession 

spark = SparkSession \
        .builder \
        .appName("Example") \
        .getOrCreate()

In [7]:
spark.version

'3.2.0'

Access to SparkContext as follow

In [10]:
sc = spark.sparkContext  # same as spark._sc

#### Two ways to create RDDs
- Parallelizing an existing collection in your driver program
- Referencing a dataset in an external storage system, such as a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat

In [11]:
data = list(range(10000))
distData = sc.parallelize(data)  # distributed dataset
distData

ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:274

In [12]:
distData.reduce(lambda a, b: a + b)

49995000

In [23]:
from operator import add
distData.reduce(add)

49995000

## Text File

In [None]:
lines = sc.textFile(path + "anna.txt")
# lines = sc.textFile("hdfs://...") ## Hadoop file system

# rdd = sc.textFile('BostonHousing.csv')
# df = spark.read.csv(rdd)
lines

In [24]:
lineLengths = lines.map(lambda s: len(s))
lineLengths

PythonRDD[15] at RDD at PythonRDD.scala:53

In [25]:
totalLength = lineLengths.reduce(add)
totalLength

1944960

In [26]:
lineCounts = lines.map(lambda s: len(s.split()))
wordCounts = lineCounts.reduce(add)
wordCounts

352929

### Word count

In [27]:
lines = sc.textFile(path + "anna.txt")

In [None]:
# map: Return a new distributed dataset formed by passing 
#      each element of the source through a function func.
# flatMap: Similar to map, but each input item can be 
#          mapped to 0 or more output items 
counts = lines.flatMap(lambda line: line.split(' ')) \
              .map(lambda word: (word, 1)) \
              .reduceByKey(lambda a, b: a + b)

# return all the elements of the dataset as an array
counts.collect()

In [30]:
counts = lines.flatMap(lambda line: line.split(' ')) \
              .map(lambda word: (word, 1)) \
              .reduceByKey(lambda a, b: a + b) \
              .sortBy(lambda x: x[1], ascending=False)

counts.take(5)

[('the', 16498), ('and', 11610), ('to', 9989), ('', 9036), ('of', 8594)]

In [31]:
counts = lines.flatMap(lambda line: line.split(' ')) \
              .map(lambda word: (word, 1)) \
              .reduceByKey(lambda a, b: a + b) \
              .sortBy(lambda x: x[1], ascending=False)

counts.take(5)

[('the', 16498), ('and', 11610), ('to', 9989), ('', 9036), ('of', 8594)]

Passing Functions to Spark

In [35]:
def containPrince(line):
    return "Prince" in line

rdd = lines.filter(containPrince) \
           .collect()
rdd[:5]

['Three days after the quarrel, Prince Stepan Arkadyevitch',
 'prepared for the university with the young Prince Shtcherbatsky, the',
 'the young Princess Shtcherbatskaya an offer of marriage; in all',
 '"Delighted to see you," said Princess Shtcherbatskaya. "On Thursdays we',
 'directly; Prince Golistin with a lady. Fresh oysters have come in."']

In [47]:
lines.filter(containPrince).flatMap(lambda line: line.split()).map(lambda word: (word,1)).collect()[:5]

[('Three', 1), ('days', 1), ('after', 1), ('the', 1), ('quarrel,', 1)]

In [51]:
lines.filter(containPrince) \
     .flatMap(lambda line: line.split()) \
     .map(lambda word: (word,1)) \
     .reduceByKey(lambda a,b :a+b) \
     .sortBy(lambda x: x[1], ascending=False) \
     .collect()[:5]

[('Princess', 125), ('the', 78), ('to', 42), ('of', 36), ('and', 33)]

In [34]:
rdd = lines.filter(containPrince) \
           .flatMap(lambda line: line.split(' ')) \
           .map(lambda word: (word, 1)) \
           .count() ## 합치기 전에 전체 word key-value 쌍 count 해주면 결국엔 전체 단어 개수가 출력된다.
rdd

1691

Creating DataFrames

In [38]:
df = spark.read.json(path+"people.json")
# Displays the content of the DataFrame to stdout
df.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



In [39]:
df.printSchema()

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



In [52]:
# Select only the "name" column
df.select("name").show()

+-------+
|   name|
+-------+
|Michael|
|   Andy|
| Justin|
+-------+



In [53]:
# Select everybody, but increment the age by 1
df.select(df['name'], df['age'] + 1).show()

+-------+---------+
|   name|(age + 1)|
+-------+---------+
|Michael|     null|
|   Andy|       31|
| Justin|       20|
+-------+---------+



In [54]:
# Select people older than 21
df.filter(df['age'] > 21).show()

+---+----+
|age|name|
+---+----+
| 30|Andy|
+---+----+



In [55]:
# Count people by age
df.groupBy("age").count().show()

+----+-----+
| age|count|
+----+-----+
|  19|    1|
|  30|    1|
|null|    1|
+----+-----+



In [56]:
# Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people") ## table name

sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()

+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+



### DataFrame Creation

In [57]:
from datetime import datetime, date
from pyspark.sql import Row

df = spark.createDataFrame([
    Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
    Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)),
    Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0))
])

df

DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

In [58]:
df = spark.createDataFrame([
    (1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)),
    (2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)),
    (3, 4., 'string3', date(2000, 3, 1), datetime(2000, 1, 3, 12, 0))
], schema='a long, b double, c string, d date, e timestamp')
df

DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

In [59]:
import pandas as pd

pandas_df = pd.DataFrame({
    'a': [1, 2, 3],
    'b': [2., 3., 4.],
    'c': ['string1', 'string2', 'string3'],
    'd': [date(2000, 1, 1), date(2000, 2, 1), date(2000, 3, 1)],
    'e': [datetime(2000, 1, 1, 12, 0), datetime(2000, 1, 2, 12, 0), datetime(2000, 1, 3, 12, 0)]
})
df = spark.createDataFrame(pandas_df)
df

DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

In [60]:
pandas_df

Unnamed: 0,a,b,c,d,e
0,1,2.0,string1,2000-01-01,2000-01-01 12:00:00
1,2,3.0,string2,2000-02-01,2000-01-02 12:00:00
2,3,4.0,string3,2000-03-01,2000-01-03 12:00:00


## Machine Learning

In [68]:
svm_df = pd.read_csv(path+ 'sample_svm_data.txt', sep=' ', header =None)
label = svm_df.iloc[:,0].values.tolist()

In [70]:
svm_df

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16
0,1,0.000000,2.520784,0.000000,0.000000,0.0,2.004684,2.000347,0.000000,2.228387,2.228387,0.0,0.000000,0.0,0.0,0.0,0.0
1,0,2.857738,0.000000,0.000000,2.619965,0.0,2.004684,2.000347,0.000000,2.228387,2.228387,0.0,0.000000,0.0,0.0,0.0,0.0
2,0,2.857738,0.000000,2.061394,0.000000,0.0,2.004684,0.000000,0.000000,2.228387,2.228387,0.0,0.000000,0.0,0.0,0.0,0.0
3,1,0.000000,0.000000,2.061394,2.619965,0.0,2.004684,2.000347,0.000000,0.000000,0.000000,0.0,2.055003,0.0,0.0,0.0,0.0
4,1,2.857738,0.000000,2.061394,2.619965,0.0,2.004684,0.000000,0.000000,0.000000,0.000000,0.0,2.055003,0.0,0.0,0.0,0.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
317,1,2.857738,0.000000,0.000000,2.619965,0.0,2.004684,2.000347,0.000000,2.228387,2.228387,0.0,0.000000,0.0,0.0,0.0,0.0
318,0,2.857738,0.000000,0.000000,2.619965,0.0,2.004684,2.000347,2.122974,0.000000,0.000000,0.0,0.000000,0.0,0.0,0.0,0.0
319,1,2.857738,0.000000,0.000000,2.619965,0.0,0.000000,2.000347,0.000000,2.228387,2.228387,0.0,0.000000,0.0,0.0,0.0,0.0
320,1,0.000000,2.520784,0.000000,2.619965,0.0,0.000000,0.000000,0.000000,2.228387,2.228387,0.0,2.055003,0.0,0.0,0.0,0.0


In [71]:
from pyspark.mllib.classification import SVMWithSGD, SVMModel
from pyspark.mllib.regression import LabeledPoint

# Load and parse the data
def parsePoint(line):
    values = [float(x) for x in line.split(' ')]
    return LabeledPoint(values[0], values[1:])

data = sc.textFile(path + "sample_svm_data.txt")
parsedData = data.map(parsePoint)

In [82]:
parsedData.collect()[0]

LabeledPoint(1.0, [0.0,2.52078447201548,0.0,0.0,0.0,2.004684436494304,2.000347299268466,0.0,2.228387042742021,2.228387042742023,0.0,0.0,0.0,0.0,0.0,0.0])

In [74]:
# Build the model
model = SVMWithSGD.train(parsedData, iterations=100)

# Evaluating the model on training data
labelsAndPreds = parsedData.map(lambda p: (p.label, model.predict(p.features)))
trainErr = labelsAndPreds.filter(lambda lp: lp[0] != lp[1]).count() / float(parsedData.count())
print("Training Error = " + str(trainErr))

Training Error = 0.38198757763975155


## K-means clustering

In [83]:
from pyspark.ml.clustering import KMeans

In [84]:
dataset = spark.read.format("libsvm").load(path + "sample_kmeans_data.txt")

In [85]:
# Trains a k-means model.
kmeans = KMeans().setK(2).setSeed(1)
model = kmeans.fit(dataset)

In [86]:
# Make predictions
predictions = model.transform(dataset)
predictions.show()  # prediction.toPandas()

+-----+--------------------+----------+
|label|            features|prediction|
+-----+--------------------+----------+
|  0.0|           (3,[],[])|         1|
|  1.0|(3,[0,1,2],[0.1,0...|         1|
|  2.0|(3,[0,1,2],[0.2,0...|         1|
|  3.0|(3,[0,1,2],[9.0,9...|         0|
|  4.0|(3,[0,1,2],[9.1,9...|         0|
|  5.0|(3,[0,1,2],[9.2,9...|         0|
+-----+--------------------+----------+



#### Session stop

In [None]:
spark.stop()