In [1]:
from pyspark import SparkContext

In [2]:
sc=SparkContext("local", "PySparkIntro")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/11/17 08:39:31 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
from pyspark.sql import SparkSession


In [18]:
spark=SparkSession.builder.appName("PySparkIntro").getOrCreate()

In [5]:
data=[1,2,3,4,5]

In [6]:
rdd=sc.parallelize(data)

In [7]:
squared_rdd=rdd.map(lambda x:x**2)

In [8]:
even_rdd=rdd.map(lambda x:x%2==0)

In [9]:
even_rdd=rdd.filter(lambda x:x%2==0)

In [10]:
collected_data=squared_rdd.collect()

                                                                                

In [11]:
num_elements=squared_rdd.count()

In [12]:
from pyspark.sql.types import StructType,StructField,StringType,IntegerType

In [13]:
schema=StructType([
    StructField("Name", StringType(),True),
    StructField("Age",IntegerType(), True),
    StructField("Salary",IntegerType(),True)
])

In [14]:
data=[("Alice", 23, 4540000), ("Bobs", 34,6790000), ("Cath", 33, 66777000)]

In [19]:
ds=spark.createDataFrame(data,schema)

In [20]:
ds=ds.alias("employees")

In [21]:
ds.show()

+-----+---+--------+
| Name|Age|  Salary|
+-----+---+--------+
|Alice| 23| 4540000|
| Bobs| 34| 6790000|
| Cath| 33|66777000|
+-----+---+--------+



In [None]:
data_file="path/to/data.csv"
df=spark.read.csv(data_file, header=True, inferSchema=True)

AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/Users/vishakhachaudhary/Desktop/Pyspark/path/to/data.csv.

In [28]:
data=[("Alice", 23, 4540000), ("Bobs", 34,6790000), ("Cath", 33, 66777000)]
df=spark.createDataFrame(data,["Name", "Age", "Salary"])
df.show()

+-----+---+--------+
| Name|Age|  Salary|
+-----+---+--------+
|Alice| 23| 4540000|
| Bobs| 34| 6790000|
| Cath| 33|66777000|
+-----+---+--------+



In [30]:
json_data_file="path/to/data.json"
df_json=spark.read.json(json_data_file)

KeyboardInterrupt: 

In [None]:
xml_data_file="path/to/data.xml"
df_xml=spark.read.format("xml").option("rowTag","employee").load(xml_data_file)

In [32]:
data_with_missing=[("Alice",None, 45000), ("Bob", 34,None), ("Cath", 23, 44000)]

In [33]:
df_missing=spark.createDataFrame(data_with_missing, ["Name", "Age", "Salary"])

In [50]:
mean_age=df_missing.select("Age").agg({"Age":"avg"}).collect()[0][0]

In [51]:
df_cleaned=df_missing.na.fill(mean_age, subset=["Age"])

In [52]:
df_cleaned.show()

+-----+---+------+
| Name|Age|Salary|
+-----+---+------+
|Alice| 28| 45000|
|  Bob| 34|  NULL|
| Cath| 23| 44000|
+-----+---+------+



In [56]:
from pyspark.ml.feature import MinMaxScaler, StandardScaler
from pyspark.ml.feature import VectorAssembler

In [55]:
!pip install numpy

17200.06s - pydevd: Sending message related to process being replaced timed-out after 5 seconds
Collecting numpy
  Downloading numpy-1.26.2-cp312-cp312-macosx_11_0_arm64.whl.metadata (61 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m61.2/61.2 kB[0m [31m272.1 kB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hDownloading numpy-1.26.2-cp312-cp312-macosx_11_0_arm64.whl (13.7 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m13.7/13.7 MB[0m [31m6.0 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25hInstalling collected packages: numpy
Successfully installed numpy-1.26.2


In [57]:
df=spark.createDataFrame(data, ["Name", "Age", "Salary"])

In [58]:
assembler=VectorAssembler(inputCols=["Age","Salary"], outputCol="features")

In [59]:
data_for_scaling=assembler.transform(df)

In [60]:
data_for_scaling.show()

+-----+---+--------+----------------+
| Name|Age|  Salary|        features|
+-----+---+--------+----------------+
|Alice| 23| 4540000|[23.0,4540000.0]|
| Bobs| 34| 6790000|[34.0,6790000.0]|
| Cath| 33|66777000| [33.0,6.6777E7]|
+-----+---+--------+----------------+



In [61]:
scaler_min_max=MinMaxScaler(inputCol="features", outputCol="scaled_features")

In [62]:
scaler_min_max=MinMaxScaler(inputCol="features", outputCol="scaled_features")
scaled_min_max= scaler_min_max.fit(data_for_scaling).transform(data_for_scaling)

In [63]:
scaled_min_max.show()

+-----+---+--------+----------------+--------------------+
| Name|Age|  Salary|        features|     scaled_features|
+-----+---+--------+----------------+--------------------+
|Alice| 23| 4540000|[23.0,4540000.0]|           (2,[],[])|
| Bobs| 34| 6790000|[34.0,6790000.0]|[1.0,0.0361521281...|
| Cath| 33|66777000| [33.0,6.6777E7]|[0.90909090909090...|
+-----+---+--------+----------------+--------------------+



In [66]:
scaler_standardization= StandardScaler(inputCol="features", outputCol="scaled_features", withMean=True, withStd=True)

In [68]:
scaled_standardization=scaler_standardization.fit(data_for_scaling).transform(data_for_scaling)

In [69]:
scaled_standardization.show()

+-----+---+--------+----------------+--------------------+
| Name|Age|  Salary|        features|     scaled_features|
+-----+---+--------+----------------+--------------------+
|Alice| 23| 4540000|[23.0,4540000.0]|[-1.1507929111375...|
| Bobs| 34| 6790000|[34.0,6790000.0]|[0.65759594922142...|
| Cath| 33|66777000| [33.0,6.6777E7]|[0.49319696191607...|
+-----+---+--------+----------------+--------------------+



23/11/17 14:19:14 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 926926 ms exceeds timeout 120000 ms
23/11/17 14:19:14 WARN SparkContext: Killing executors is not supported by current scheduler.
23/11/17 14:19:20 ERROR Inbox: Ignoring error
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:102)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:110)
	at org.apache.spark.util.RpcUtils$.makeDriverRef(RpcUtils.scala:36)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.driverEndpoint$lzycompute(BlockManagerMasterEndpoint.scala:124)
	at org.apache.spark.storage.BlockManagerMasterEndpoint.org$apache$spark$storage$BlockManagerMasterEndpoint$$

In [70]:
sc

ConnectionRefusedError: [Errno 61] Connection refused

<SparkContext master=local appName=PySparkIntro>

In [75]:
sc1=spark.sparkContext

In [77]:
sc1

AttributeError: 'NoneType' object has no attribute 'sc'

<SparkContext master=local appName=PySparkIntro>

In [74]:
sc1.stop()

ConnectionRefusedError: [Errno 61] Connection refused

In [82]:
sc1.show_profiles

<bound method SparkContext.show_profiles of <SparkContext master=local appName=PySparkIntro>>

In [83]:
from pyspark.sql import SparkSession

ConnectionRefusedError: [Errno 61] Connection refused