https://www.youtube.com/watch?v=_wJ_8E6VumU&list=PLz-qytj7eIWXTqncmCCSOw-GcBu2c4K0j&ab_channel=DataScienceBasics

https://www.youtube.com/watch?v=Ys9obOxJaOM&list=PL2IsFZBGM_IGiAvVZWAEKX8gg1ItnxEEb&index=11&ab_channel=EaseWithData



### General env and settings_

#### List of installed packages

In [0]:
%python
%pip list

In [0]:
%python
%pip show pandas

#### Github cloning and link
Go to Github, copy https url. Go to Workspace-> Repos, click 'create a Repo here' link, paste the url to 'Git repository URL' box and click Create Repo button. 

### Schema manupilation
#### Add comment to schema
COMMENT ON DATABASE dev.bronze IS 'This is bronze schema in the dev database.'
#### Get the schema description
DESCRIBE DATABASE EXTENDED dev.bronze

#### Create table

In [0]:
CREATE TABLE IF NOT EXISTS dev.bronze.sale (
  invoice_id STRING,
  cust_id STRING,
  product_code STRING,
  qty DOUBLE,
  price DOUBLE
);


#### Insert data

In [0]:
INSERT INTO dev.bronze.sale VALUES
('INV1001','CUST1001','P1001',10,100),
('INV1002','CUST1002','P1002',20,200),
('INV1003','CUST1003','P1003',30,300);

#### Drop and Undrop table

In [0]:
--describe extended dev.bronze.sale;
--select * from dev.bronze.sale;
--drop table dev.bronze.sale;
--use catalog dev;
--show tables dropped in bronze;
--undrop table dev.bronze.sale;

#### Create/Update Comment of database

In [0]:
COMMENT ON DATABASE dev.bronze IS 'This is bronze schema in the dev database.';
DESCRIBE DATABASE EXTENDED dev.bronze;


#### Observe dbfs file system

In [0]:
%python
%fs ls dbfs:/Volumes/dev/bronze/glacier/

In [0]:
%python
%fs ls dbfs:/databricks-datasets/learning-spark/


In [0]:
%python
dbutils.fs.mkdirs("dbfs:/databricks-datasets/FileStore/")

### Tables manupilation


#### Read table to dataframe

##### Method 1: Using Spark SQL (Most Common)

In [0]:
%python
# Read entire table
spark.sql("USE dev.bronze")
df = spark.sql("SELECT * FROM sale")

display(df)

##### Method 2: Using table() method

In [0]:
%python
df1 = spark.table("default.sample_data").filter("Age > 20")
df1.display()

##### Method 3: Using DataFrameReader with table option

In [0]:
%python
       
df3 = spark.read\
    .format("delta")\
    .table("dev.default.sample_data")
df3 = df3.withColumn("Adult",df3.Age>20)
df3.display()

##### Convert df to Pandas

In [0]:
%python
       
df3 = spark.read\
    .format("delta")\
    .table("dev.default.sample_data")
df3 = df3.withColumn("Adult",df3.Age>20)
df4 = df3.toPandas()
#df4.display() # works the same as in df
df4.describe() # works differently in Pandas
#df3.describe()

#### Uploading csv to Volume
New -> Add or upload data -> Upload files to a volume -> Destination path -> Create volume -> Volume name -> Create -> Upload

In [0]:
%python
%fs ls dbfs:/Volumes/dev/bronze/glacier/

In [0]:
%python
import pandas as pd

df4 = pd.read_csv("/Volumes/dev/bronze/glacier/glaciers.csv")
df4.display()

#### Uploading csv from web site

In [0]:
%python
import requests

url = "https://datahub.io/core/glacier-mass-balance/_r/-/data/glaciers.csv"
local_path = "/Volumes/dev/bronze/glacier/glaciers2.csv"

# Download the file
response = requests.get(url)
with open(local_path, 'wb') as file:
    file.write(response.content)

# Read the CSV file using Spark
df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv(local_path)

display(df)

### Aggregation functions

In [0]:
%python

import pyspark.sql.functions as F

df3_agg = df3.groupBy("Adult").agg(
    F.sum("Age").alias("age_sum"),
    F.avg("Age").alias("age_avg"),
    F.count("Name").alias("total_names"),
    F.min("Age").alias("age_min"),
    F.max("Age").alias("age_max")
    )
df3_agg.display()

### Window functions

#### Running total

In [0]:
%python
from pyspark.sql.window import Window
import pyspark.sql.functions as F

df3 = spark.read\
    .format("delta")\
    .table("dev.default.sample_data")
df3 = df3.withColumn("Adult",df3.Age>20)

window_spec = Window.partitionBy("Adult").orderBy(F.col("Name").desc()).rowsBetween(Window.unboundedPreceding, Window.currentRow)
df3_running_total = df3.withColumn("running_total", F.sum("Age").over(window_spec))
df3_running_total.display()

#### Moving average

In [0]:
%python
from pyspark.sql.window import Window
import pyspark.sql.functions as F

df3 = spark.read\
    .format("delta")\
    .table("dev.default.sample_data")
df3 = df3.withColumn("Adult",df3.Age>20)

window_spec = Window.partitionBy("Adult").orderBy(F.col("Name")).rowsBetween(-1, 0)
df3_runk = df3.withColumn("moving_avg", F.avg("Age").over(window_spec))
df3_runk.display()

#### Row number

In [0]:
%python
from pyspark.sql.window import Window
import pyspark.sql.functions as F

df3 = spark.read\
    .format("delta")\
    .table("dev.default.sample_data")
df3 = df3.withColumn("Adult",df3.Age>20)

window_spec_rank = Window.partitionBy("Adult").orderBy(F.col("Name"))
df3_runk = df3.withColumn("row_number", F.row_number().over(window_spec_rank))
df3_runk.display()

#### Rank, Dense Rank, add rows

In [0]:
%python
from pyspark.sql import Row
df3 = spark.read\
    .format("delta")\
    .table("dev.default.sample_data")

new_person = spark.createDataFrame([Row(Name="Bob", Age=15, City="Toronto")])
df3 = df3.unionByName(new_person)
df3 = df3.withColumn("Adult",df3.Age>20)
window_spec_rank = Window.partitionBy("Adult").orderBy(F.col("Name").asc()).rowsBetween(Window.unboundedPreceding, Window.currentRow)
df3_runk = df3.withColumn("rank", F.rank().over(window_spec_rank))
df3_runk.display()
df3_dense_runk = df3.withColumn("dense_rank", F.dense_rank().over(window_spec_rank))
df3_dense_runk.display()

### Joins

In [0]:
%python
import pyspark.sql.functions as F

transDF = spark.sql("select * from samples.bakehouse.sales_transactions")
franchiseDF = spark.sql("select * from samples.bakehouse.sales_franchises")
       
#franchiseDF.join(transDF, franchiseDF.franchiseID == transDF.franchiseID,"inner").limit(10).display()
franchiseDF.alias("fr").join(transDF.alias("tr"), F.col("fr.franchiseID") == F.col("tr.franchiseID"),"inner").limit(10).display()

Read file from dbfs

In [0]:
%python
df = spark.read.format("json").option("header",True).option("inferSchema", True).load("dbfs:/databricks-datasets/learning-spark/data-001/testweet.json")
df.display()

### Collect() method

In [0]:
%python
from pyspark.sql import Row
df3 = spark.read\
    .format("delta")\
    .table("dev.default.sample_data")
df3_col = df3.collect()
df3_col[0]

In [0]:
%python
df3_col[0][0]
df3_col[0].__getitem__("Name") + '_'+str(df3_col[1][1])

In [0]:
%python
