In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("EmployeesSQL").getOrCreate()
df = spark.read.csv("employees.csv", header=True, inferSchema=True)
df.createOrReplaceTempView("employees")
result = spark.sql("SELECT name FROM employees WHERE salary > 55000")
result.show()

+-----+
| name|
+-----+
|  Bob|
|Carol|
+-----+



In [2]:
rdd = spark.sparkContext.textFile("employees.csv")
header = rdd.first()
data_rdd = rdd.filter(lambda line: line != header)
parsed_rdd = data_rdd.map(lambda line: line.split(",")).map(lambda x: (x[1], int(x[3])))
filtered = parsed_rdd.filter(lambda x: x[1] > 55000)
filtered.collect()

[('Bob', 60000), ('Carol', 65000)]

In [8]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression

df = spark.read.csv("experience_salary.csv", header=True, inferSchema=True)
vec_assembler = VectorAssembler(inputCols=["years_experience"], outputCol="features")
data = vec_assembler.transform(df)
lr = LinearRegression(featuresCol='features', labelCol='salary')
model = lr.fit(data)
print("Coefficients:", model.coefficients)
print("Intercept:", model.intercept)

new_data_point = spark.createDataFrame([[8]], ["years_experience"])

predicted_data_with_features = vec_assembler.transform(new_data_point)

pred_result = model.transform(predicted_data_with_features)

pred_result.select("prediction").show()

Coefficients: [5142.8571428571395]
Intercept: 34142.85714285715
+-----------------+
|       prediction|
+-----------------+
|75285.71428571426|
+-----------------+



In [5]:
from pyspark.ml.classification import LinearSVC
df = spark.read.csv("svm_data.csv", header=True, inferSchema=True)
vec_assembler = VectorAssembler(inputCols=["feature1", "feature2"], outputCol="features")
data = vec_assembler.transform(df)
svm = LinearSVC(featuresCol="features", labelCol="label")
model = svm.fit(data)
print("Coefficients:", model.coefficients)
print("Intercept:", model.intercept)
predictions = model.transform(data)
predictions.select("features", "label", "prediction").show()

Coefficients: [1.1144812683762066,1.5208462218869392]
Intercept: -5.6310353600381875
+---------+-----+----------+
| features|label|prediction|
+---------+-----+----------+
|[0.5,1.0]|    0|       0.0|
|[1.5,1.8]|    0|       0.0|
|[1.0,2.0]|    0|       0.0|
|[3.0,3.5]|    1|       1.0|
|[2.0,3.0]|    1|       1.0|
|[2.5,3.2]|    1|       1.0|
|[3.5,4.0]|    1|       1.0|
|[0.2,0.5]|    0|       0.0|
+---------+-----+----------+



In [6]:
from pyspark.ml.clustering import KMeans
df = spark.read.csv("points.csv", header=True, inferSchema=True)
vec_assembler = VectorAssembler(inputCols=["x", "y"], outputCol="features")
data = vec_assembler.transform(df)
kmeans = KMeans(k=2, seed=1)
model = kmeans.fit(data)
centers = model.clusterCenters()
print("Cluster Centers:", centers)
predictions = model.transform(data)
predictions.select("x", "y", "prediction").show()

Cluster Centers: [array([3.9, 5.1]), array([1.25, 1.5 ])]
+---+---+----------+
|  x|  y|prediction|
+---+---+----------+
|1.0|1.0|         1|
|1.5|2.0|         1|
|3.0|4.0|         0|
|5.0|7.0|         0|
|3.5|5.0|         0|
|4.5|5.0|         0|
|3.5|4.5|         0|
+---+---+----------+



In [7]:
from pyspark.ml.feature import Tokenizer, CountVectorizer
from pyspark.mllib.linalg.distributed import RowMatrix
df = spark.read.text("lsi_docs.txt").toDF("text")
tokenizer = Tokenizer(inputCol="text", outputCol="words")
words_data = tokenizer.transform(df)
cv = CountVectorizer(inputCol="words", outputCol="features")
model = cv.fit(words_data)
result = model.transform(words_data)
rows = result.select("features").rdd.map(lambda x: x[0].toArray())
row_matrix = RowMatrix(rows)
svd = row_matrix.computeSVD(2, computeU=True)
print("Singular Values:", svd.s)
print("First 2 rows of U:")
svd.U.rows.take(2)

Singular Values: [4.344585204685772,3.022309029060833]
First 2 rows of U:


[DenseVector([0.555, -0.1307]), DenseVector([0.5909, 0.073])]