<a href="https://colab.research.google.com/github/saishshinde15/PySpark_Codes/blob/main/Read_Write_In_Lakehouse_Table.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
pip install pyspark



In [2]:
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.appName("Read/Write_To_Lakehouse_table").getOrCreate()

In [12]:
df=spark.read.csv('/content/property-sales.csv',header=True)

In [13]:
df.show()

+-------------------+--------------+-----------+-------------+-----------------+
|           Address |          Type|      City |SalePrice ($)|            Agent|
+-------------------+--------------+-----------+-------------+-----------------+
|   1 Rowley Street |Detached House|   New York|       745000|Penelope Pullman |
|13a lollipop avenue|     Apartment|Los Angeles|       345000|      Jack Smith |
|       34 the drive|         House|    Atlanta|       459000|     Sheila Sammi|
+-------------------+--------------+-----------+-------------+-----------------+



In [14]:
display(df)

DataFrame[Address : string, Type: string, City : string, SalePrice ($): string, Agent: string]

In [15]:
df.printSchema() # While reading/writing in the lakehouse , there should be no space in the names of the column . As in this case there is spaces in Address columns and SalePrice column

root
 |-- Address : string (nullable = true)
 |-- Type: string (nullable = true)
 |-- City : string (nullable = true)
 |-- SalePrice ($): string (nullable = true)
 |-- Agent: string (nullable = true)



In [16]:
df.dtypes

[('Address ', 'string'),
 ('Type', 'string'),
 ('City ', 'string'),
 ('SalePrice ($)', 'string'),
 ('Agent', 'string')]

In [22]:
df = df.withColumnRenamed("SalePrice ($)","SalePrice_USD") \
        .withColumnRenamed("Address ", "Address")\
        .withColumnRenamed("City ", "City")


In [23]:
df.show()

+-------------------+--------------+-----------+-------------+-----------------+
|            Address|          Type|       City|SalePrice_USD|            Agent|
+-------------------+--------------+-----------+-------------+-----------------+
|   1 Rowley Street |Detached House|   New York|       745000|Penelope Pullman |
|13a lollipop avenue|     Apartment|Los Angeles|       345000|      Jack Smith |
|       34 the drive|         House|    Atlanta|       459000|     Sheila Sammi|
+-------------------+--------------+-----------+-------------+-----------------+



In [24]:
df.printSchema()

root
 |-- Address: string (nullable = true)
 |-- Type: string (nullable = true)
 |-- City: string (nullable = true)
 |-- SalePrice_USD: string (nullable = true)
 |-- Agent: string (nullable = true)



## Managed Tables

In [25]:
delta_table_name = 'PropertySales' # Here "delta" means it will the store in the lakehouse which uses delta framwork (basicly the underneth arch is , the files are parquet files)

# use saveAsTable to save as a Managed Table
df.write.mode("overwrite").format("delta").saveAsTable(delta_table_name)
# This code will not run in colab as we have not intiated any delta lake/lakehouse


Py4JJavaError: An error occurred while calling o983.saveAsTable.
: org.apache.spark.SparkClassNotFoundException: [DATA_SOURCE_NOT_FOUND] Failed to find the data source: delta. Please find packages at `https://spark.apache.org/third-party-projects.html`.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.dataSourceNotFoundError(QueryExecutionErrors.scala:725)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:647)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSourceV2(DataSource.scala:697)
	at org.apache.spark.sql.DataFrameWriter.lookupV2Provider(DataFrameWriter.scala:873)
	at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:568)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.ClassNotFoundException: delta.DefaultSource
	at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:476)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:594)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$5(DataSource.scala:633)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.sql.execution.datasources.DataSource$.$anonfun$lookupDataSource$4(DataSource.scala:633)
	at scala.util.Failure.orElse(Try.scala:224)
	at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:633)
	... 15 more


In [None]:
 #these are four different write 'modes'

# append the new dataframe to the existing Table
df.write.mode("append").format("delta").saveAsTable(delta_table_name)

# overwrite existing Table with new DataFrame
df.write.mode("overwrite").format("delta").saveAsTable(delta_table_name)

# Throw error if data already exists
df.write.mode("error").format("delta").saveAsTable(delta_table_name)

# Fail silently if data already exists
df.write.mode("ignore").format("delta").saveAsTable(delta_table_name)

## Unmanaged Tables


In [None]:
# unmanaged table
df.write.mode("overwrite").format("delta").save(path="Files/delta/unmanaged.delta")


In [26]:
df

DataFrame[Address: string, Type: string, City: string, SalePrice_USD: string, Agent: string]

In [27]:
#convert df to pandas using import pyspark.pandas as ps

import pyspark.pandas as ps

# Assuming 'df' is your existing PySpark DataFrame
psdf = df.to_pandas_on_spark()

# Now you can work with psdf, which is a pandas-on-Spark DataFrame
print(psdf.head())
print(psdf.info())



               Address            Type         City SalePrice_USD              Agent
0     1 Rowley Street   Detached House     New York        745000  Penelope Pullman 
1  13a lollipop avenue       Apartment  Los Angeles        345000        Jack Smith 
2         34 the drive           House      Atlanta        459000       Sheila Sammi
<class 'pyspark.pandas.frame.DataFrame'>
Int64Index: 3 entries, 0 to 2
Data columns (total 5 columns):
 #   Column         Non-Null Count  Dtype 
---  ------         --------------  ----- 
 0   Address        3 non-null      object
 1   Type           3 non-null      object
 2   City           3 non-null      object
 3   SalePrice_USD  3 non-null      object
 4   Agent          3 non-null      object
dtypes: object(5)None
