In [0]:
# Importing cassandra into pyspark,
import os
import cassandra
os.environ['PYSPARK_SUBMIT_ARGS'] = \
'--packages com.datastax.spark:\
spark-cassandra-connector_2.11:2.3.0 \
--conf spark.cassandra.connection.host=192.168.0.123,192.168.0.124 pyspark-shell'

In [0]:
#Importing all cassandra clusters in local system
from cassandra.cluster import Cluster
cluster = Cluster(['127.0.0.1'])
session = cluster.connect()

In [0]:
#creating a spark session
from pyspark.sql import SparkSession
from pyspark import SparkContext
SpSession = SparkSession \
    .builder \
    .master("local[2]") \
    .appName("iiitmk") \
    .config("spark.executor.memory", "10g") \
    .config("spark.cores.max","24") \
    .config("spark.sql.warehouse.dir", "/tmp/spark")\
    .getOrCreate()
    

In [0]:
#importing the spmrkts columnfamily(table) aa pandas dataframe for preprocessing
import pandas as pd
query = "SELECT * from dba.sprmkts;"
df = pd.DataFrame(list(session.execute(query)))

In [0]:
# customerid is not useful for predictions hence the column is dropped
df.pop('idcustomer')

In [0]:
#gender column has values 1,2,M,F hence changing them to int values of just 1,2 (assuming M=1, F=2)
inter=list(df['gender'])
for i,j in enumerate(inter):
  if(j=='M'):
    inter[i]= 1
  elif(j=='F'):
    inter[i]= 2
  elif(j=='1'):
    inter[i]= 1
  elif(j=='2'):
    inter[i]= 2
df['gender']=inter

In [0]:
#using pandas profilind=g for data analysis, visualisation
import pandas_profiling as pp
pp.ProfileReport(df)

In [0]:
#removing excess 0 class values by random sampling because data is skewed
from random import sample
c1=[]
c0=[]
for i in df.index:
if df.values[i][-1]==0:
c0.append(i)
elif df.values[i][-1]==1:
c1.append(i)
else:
pass
r0=sample(c0,197)
r1=sample(c1,3)
r=r1+r0
df=df.drop(r)

In [0]:
# creating pyspark dataframe(rdd) from pandas dataframe
data = spark.createDataFrame(df)
data.count()

In [0]:
data.describe('age','amount1','married_or_not').show()

In [0]:
# correlation of features with label
for i in data.columns:
  if not( isinstance(data.select(i).take(1)[0][0], str)) :
    print( "Correlation to 'come' for ", i, data.stat.corr('come',i))

In [0]:
#creating feature,label dataframe for model
from pyspark.ml.linalg import Vectors
def transformToLabeledPoint(row) :
    lp = ( row["come"], Vectors.dense([row["balance"],row["gender"],\
                                       row["education_level"],row["married_or_not"],\
                                       row["age"],row["pay_month1"],\
                                       row["pay_month2"],row["pay_month3"],\
                                       row["pay_month4"],row["pay_month6"],\
                                       row["pay_month7"],row["amount1"],\
                                       row["pay_amt1"],row["pay_amt2"],\
                                       row["pay_amt3"],row["pay_amt4"],\
                                       row["pay_amt5"],row["pay_amt6"]]))
    return lp


In [0]:
data = data.rdd.map(transformToLabeledPoint)
data = SpSession.createDataFrame(data,["label", "features"])
data.select("label","features").show(10)

In [0]:
#test-train split for SVM model
from pyspark.ml import Pipeline
from pyspark.ml.classification import LinearSVC
(training_data, test_data) = data.randomSplit([0.8, 0.2])

In [0]:
# SVM model
SVMModel=LinearSVC()
Model=SVMModel.fit(training_data)
predictions = Model.transform(test_data)

In [0]:
predictions.groupBy("label","prediction").count().show()
Model.transform(training_data).groupBy("label","prediction").count().show()
#confusion matrix