## Pandas Series and DataFrame

In [None]:
%pyspark 

import pandas as pd

df = pd.DataFrame([['frank', 'M', 29], ['mary', 'F', 23], ['tom', 'M', 35], ['ted', 'M', 33], ['jean', 'F', 21], ['lisa', 'F', 20]])

df.columns = ['name', 'gender', 'age']
df


s = pd.Series([11, 22, 33, 44, 55])
s

s.max()
s.min()
s.mean()
s.describe()

s[2]
s[2:4]

s.index = ['a', 'b', 'c', 'd', 'e']
s

s['c']

age  = pd.Series([22,34,42])
name = pd.Series(['mary', 'toby', 'sherry'])

pd.DataFrame([name, age]).T


df = pd.DataFrame([['frank', 'M', 29], ['mary', 'F', 23], ['tom', 'M', 35], ['ted', 'M', 33], ['jean', 'F', 21], ['lisa', 'F', 20]])

df.columns = ['name', 'gender', 'age'] 
df

df.describe()
df.ix[1]

df.ix[1:4]

df[['name', 'age']]


df['gender'] == 'M'
df[df['gender'] == 'M']

df[df['gender'] == 'M'].mean()
df[df['gender'] == 'F'].mean()

df.groupby('gender')['age'].mean()

## SparkSQL

In [None]:
%pyspark
from pyspark.sql import SQLContext 
sqlContext = SQLContext(sc)


data_file = "file:///tmp/ratings.txt" 
raw_data = sc.textFile(data_file)
raw_data.take(3)

header = raw_data.first()
header

skip_data = raw_data.filter(lambda line: line != header)
skip_data.take(3)

csv_data = skip_data.map(lambda l: l.split('::'))
csv_data.take(3)

from pyspark.sql import Row
row_data = csv_data.map(lambda p: Row(
   userid = p[0],
   itemid = p[1],
   rating = int(p[2])
)
)
row_data.take(3)

### Spark DataFrame 操作

In [None]:
%pyspark
df = sqlContext.createDataFrame(row_data)
#df.show(5)
#df.take(5)
# select itemid, rating from df where rating >= 4 limit 5
df.filter('rating >= 4').select('itemid', 'rating').show(5)
df.select('userid','rating').groupBy('userid').avg().show()

### SparkSQL

In [None]:
%pyspark
df.registerTempTable("ratings")
#df.printSchema()
ratings_data = sqlContext.sql("""
SELECT itemid,avg(rating) as avg_rating from ratings group by itemid order by avg(rating) desc limit 5
""")
ratings_data.show()



### 將 Spark DataFrame 轉換為 rdd

In [None]:
rating_out  = ratings_data.rdd.map(lambda e: 'itemid: {}, rating: {}'.format(e.itemid, e.avg_rating))
rating_out.take(3)


### 將 Spark DataFrame 轉換為 Pandas DataFrame 

In [None]:
pandas_df = ratings_data.toPandas()
pandas_df

## 找出最受歡迎的電影

### 產生 Movies DataFrame

In [None]:
%pyspark
movie_raw    = sc.textFile('file:///tmp/u.item')
movie_parsed = movie_raw.map(lambda l:l.split('|'))
movie_row    = movie_parsed.map(lambda p : Row(
        movieid   = p[0],
        moviename = p[1]
    )) 
movie_row.take(3)
moviedf = sqlContext.createDataFrame(movie_row)
moviedf.registerTempTable("movies")

### 產生 Ratings DataFrame

In [None]:
%pyspark
rating_raw    = sc.textFile('file:///tmp/u.data')
rating_parsed = rating_raw.map(lambda l: l.split())
rating_row    = rating_parsed.map(lambda p : Row(
        userid    = p[0],
        movieid   = p[1],
        rating    = int(p[2])
    )) 
rating_row.take(3)
ratingdf = sqlContext.createDataFrame(rating_row)
ratingdf.registerTempTable("ratings")

### 將兩個表格合併

In [None]:
%pyspark
moviedf.printSchema()
ratingdf.printSchema()

ratings_data = sqlContext.sql("""
    SELECT moviename,count(rating) as rating_cnt from ratings inner join movies on movies.movieid = ratings.movieid group by moviename order by rating_cnt DESC LIMIT 10
""")
#SELECT moviename,avg(rating) as avg_rating from ratings inner join movies on movies.movieid = ratings.movieid group by moviename order by avg_rating DESC LIMIT 10
ratings_data.show()

## 分類模型

### 讀取資料

In [None]:
%pyspark
raw_data = sc.textFile("file:///tmp/churnTrain.csv")
raw_data.take(3)

header = raw_data.first()
header

skip_data = raw_data.filter(lambda line : line != header)
skip_data.take(3)

splitlines = skip_data.map(lambda l: l.split(","))
splitlines.take(3)

### 資料預處理

In [None]:
%pyspark

from pyspark.mllib.linalg import Vectors
from pyspark.mllib.regression import LabeledPoint

def parseLine(col):
    features = []
    churn    = col[-1] 
    international  = 0 if col[4] == '"no"' else 1
    voice          = 0 if col[5] == '"no"' else 1
    label          = 0 if churn  == '"no"' else 1
    features.append(international)
    features.append(voice)
    features += col[6:-1]
    return LabeledPoint(label, Vectors.dense(features) )


trainData = splitlines.map(parseLine)
trainData.take(3)

### 建立決策樹

In [None]:
%pyspark
from pyspark.mllib.tree import DecisionTree
       
model = DecisionTree.trainClassifier(trainData, numClasses=2, categoricalFeaturesInfo={},
impurity='gini', maxDepth=5)
         

### 印出決策樹

In [None]:
%pyspark
print("Learned classification tree model:") 
print(model.toDebugString())

### 評估模型

In [None]:
%pyspark
head = trainData.first() 
model.predict(head.features)

predictions = model.predict(trainData.map(lambda p: p.features))
#predictions.collect()
filtered_labels_and_preds = labels_and_preds.filter(lambda v : v[0] == v[1]) 
test_accuracy = filtered_labels_and_preds.count() / float(trainData.count())
test_accuracy

from pyspark.mllib.evaluation import BinaryClassificationMetrics 
metrics = BinaryClassificationMetrics(labels_and_preds)
print("Area under PR = %s" % metrics.areaUnderPR) 
print("Area under ROC = %s" % metrics.areaUnderROC)

### 將資料區分為訓練與測試資料集

In [None]:
train_test_dataset = trainData.randomSplit([0.7,0.3])
trainset = train_test_dataset[0]
testset  = train_test_dataset[1]

### 根據訓練資料建立模型

In [None]:
%pyspark
from pyspark.mllib.tree import DecisionTree
       
model = DecisionTree.trainClassifier(trainset, numClasses=2, categoricalFeaturesInfo={},
impurity='gini', maxDepth=5)
         

### 根據測試資料評估模型

In [None]:
%pyspark
predictions = model.predict(testset.map(lambda p:
p.features))
labels_and_preds = testset.map(lambda p: p.label).zip(predictions)
metrics = BinaryClassificationMetrics(labels_and_preds)

print("Area under PR = %s" % metrics.areaUnderPR) 
print("Area under ROC = %s" % metrics.areaUnderROC)