In [None]:
pip install pyspark

In [None]:
from pyspark import SparkConf,SparkContext
import collections

sc=SparkContext()
rdd=sc.parallelize([3,4,56,7.4,2])

sq=rdd.map(lambda x: x*x)
print(sq.collect())
sc.stop()

# Rating Histogram

In [None]:
from pyspark import SparkConf, SparkContext
import collections

conf = SparkConf().setMaster('local').setAppName('RatingsHistogram')
sc = SparkContext(conf = conf)

lines = sc.textFile('u.data')
ratings = lines.map(lambda x: x.split()[2])
result = ratings.countByValue()

sortedResults = collections.OrderedDict(sorted(result.items()))
for key, value in sortedResults.items():
    print("%s %i" % (key, value))

sc.stop()

In [None]:
from pyspark import SparkConf, SparkContext
import collections

conf = SparkConf().setMaster('local').setAppName('MinimumTemperature')
sc = SparkContext(conf = conf)

def parseLine(line):
    fields = line.split(',')
    stationID = fields[0]
    entryType = fields[2]
    temperature = float(fields[3]) * 0.1 * (9.0 / 5.0) + 32.0
    return (stationID, entryType, temperature)

lines = sc.textFile('1800.csv')
parsedLines = lines.map(parseLine)
minTemps = parsedLines.filter(lambda x: 'TMIN' in x[1])
stationTemps = minTemps.map(lambda x: (x[0], x[2]))
minTemps = stationTemps.reduceByKey(lambda x, y: min(x,y))

results = minTemps.collect();

for result in results:
    print(result[0] + "\t{:.2f}F".format(result[1]))


sc.stop()

In [None]:
from pyspark import SparkConf, SparkContext
import collections

conf = SparkConf().setMaster('local').setAppName('MaxTemperature')
sc = SparkContext(conf = conf)

def parseLine(line):
    fields = line.split(',')
    stationID = fields[0]
    entryType = fields[2]
    temperature = float(fields[3]) * 0.1 * (9.0 / 5.0) + 32.0
    return (stationID, entryType, temperature)

lines = sc.textFile('1800.csv')
parsedLines = lines.map(parseLine)
maxTemps = parsedLines.filter(lambda x: 'TMAX' in x[1])
stationTemps = maxTemps.map(lambda x: (x[0], x[2]))
maxTemps = stationTemps.reduceByKey(lambda x, y: max(x,y))

results = maxTemps.collect();

for result in results:
    print(result[0] + "\t{:.2f}F".format(result[1]))


sc.stop()
     

In [None]:
import re
from pyspark import SparkConf, SparkContext
import collections

def normalizeWords(text):
    return re.compile(r'\W+', re.UNICODE).split(text.lower())

conf = SparkConf().setMaster('local').setAppName('WordCount')
sc = SparkContext(conf = conf)

input = sc.textFile('book.txt')
words = input.flatMap(normalizeWords)

wordCounts = words.map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)
wordCountsSorted = wordCounts.map(lambda x: (x[1], x[0])).sortByKey(False)
results = wordCountsSorted.collect()

for result in results:
    count = str(result[0])
    word = result[1].encode('ascii', 'ignore')
    if (word):
        print(word.decode() + ":\t\t" + count)

sc.stop()

In [None]:
from pyspark import SparkConf, SparkContext
import collections

conf = SparkConf().setMaster('local').setAppName('PopularHero')
sc = SparkContext(conf = conf)

def countCoOccurences(line):
    elements = line.split()
    return (int(elements[0]), len(elements) - 1)

def parseNames(line):
    fields = line.split('\"')
    return (int(fields[0]), fields[1].encode("utf8"))

names = sc.textFile("Marvel-names.txt")
namesRdd = names.map(parseNames)

lines = sc.textFile("Marvel-graph.txt")

pairings = lines.map(countCoOccurences)
totalFriendsByCharacter = pairings.reduceByKey(lambda x, y : x + y)
flipped = totalFriendsByCharacter.map(lambda x : (x[1], x[0]))

mostPopular = flipped.max()

mostPopularName = namesRdd.lookup(mostPopular[1])[0]

print(str(mostPopularName) + " is the most popular superhero, with " + str(mostPopular[0]) + " co-appearances.")

sc.stop()


In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.sql.types import IntegerType, FloatType

spark=SparkSession.builder.appName("Pima Indian Classifier").getOrCreate()

data=spark.read.csv("pima-indians-diabetes.csv",header=True,inferSchema=True)

for col in data.columns[:-1]:
    if col !='Outcome':
        data=data.withColumn(col, data[col].cast(IntegerType())) 

predictors=list(data.columns[:-1])
assembler=VectorAssembler(inputCols=predictors, outputCol="features")
data=assembler.transform(data).select('features', 'Outcome')

train,test=data.randomSplit([0.8, 0.2], seed=42)
lr=LogisticRegression(labelCol="Outcome",featuresCol="features")
model=lr.fit(train)
predictions=model.transform(test)

evaluator=BinaryClassificationEvaluator(labelCol="Outcome")
accuracy=evaluator.evaluate(predictions)

print('Accuracy: ',accuracy)

spark.stop()