In [0]:

df_poc = spark.sql("select * from uc_poc.poc_stg.customers_stg" )


In [0]:
df_poc.printSchema()

In [0]:
from pyspark.sql.functions import explode, col
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, ArrayType, FloatType

data = {
    "id": 1,
    "name": "Alice",
    "orders": [
        {
            "order_id": "100",
            "items": [
                {"item_id": "a1", "price": 10},
                {"item_id": "a2", "price": 15}
            ]
        },
        {
            "order_id": "101",
            "items": [
                {"item_id": "b1", "price": 7}
            ]
        }
    ]
}

schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("orders", ArrayType(
        StructType([
            StructField("order_id", StringType(), True),
            StructField("items", ArrayType(
                StructType([
                    StructField("item_id", StringType(), True),
                    StructField("price", FloatType(), True)
                ])
            ), True)
        ])
    ), True)
])

df = spark.createDataFrame([data], schema=schema)
df_orders = df.select("id", "name", explode("orders").alias("order"))
df_items = df_orders.select(
    "id",
    "name",
    col("order.order_id").alias("order_id"),
    explode("order.items").alias("item")
).select(
    "id",
    "name",
    "order_id",
    col("item.item_id").alias("item_id"),
    col("item.price").alias("price")
)

display(df_items)

In [0]:
%sql
SELECT a.Country || '-' || b.Country AS Country_Pair
FROM VALUES ('Ind'), ('SL'), ('Pak'), ('Ban') AS a(Country),
     VALUES ('Ind'), ('SL'), ('Pak'), ('Ban') AS b(Country)
WHERE a.Country < b.Country
ORDER BY a.Country, b.Country;

In [0]:
%sql
CREATE OR REPLACE TEMP VIEW Country AS
SELECT 'Ind' AS Country UNION ALL
SELECT 'SL' UNION ALL
SELECT 'Pak' UNION ALL
SELECT 'Ban';

SELECT concat(a.Country,"-",  b.Country) AS Country_Pair
FROM Country a, Country b
WHERE a.Country < b.Country;

In [0]:
from pyspark.sql.functions import split, explode, trim,col

data = [
    (1, "Gaurav","Pune,Bangalore,Hyderabad"),
    (2, "Risabh","Mumbai,Bangalore,Pune")
]

columns = ["EmpId", "Name", "Locations"]

df = spark.createDataFrame(data, columns)
df_split  = df.withColumn("listlocation",split(col("Locations"),","))
display(df_split)
 
 
df_output = df_split.select("empid","Name",explode(col("listlocation")).alias("locations")) 

#df_split = df.withColumn("Location", explode(split("Locations", ",")))
#df_final = df_split.withColumn("Location", trim(df_split["Location"])).select("EmpId", "Name", "Location")

display(df_output)

In [0]:
from pyspark.sql import Window
from pyspark.sql.functions import row_number, col
data = [(1,), (2,), (4,), (5,), (-1,), (-2,), (-3,), (-4,)]

columns = ["colname"]

df = spark.createDataFrame(data, columns)

df_positive = df.filter(col("colname") > 0).withColumnRenamed("colname", "positive_column")
df_negative = df.filter(col("colname") < 0).withColumnRenamed("colname", "negative_col")

df_output = df_negative.withColumn("row_num", row_number().over(Window.orderBy("negative_col"))) \
    .join(df_positive.withColumn("row_num", row_number().over(Window.orderBy("positive_column"))), "row_num") \
    .select("negative_col", "positive_column")

display(df_output)

In [0]:
%sql
CREATE TABLE IF NOT EXISTS sales_data (
product_name VARCHAR( 100), category VARCHAR(50), sales DECIMAL (10, 2)
);
INSERT
INTO sales_data(product_name, category, sales)
VALUES
('Product A','Category 1',1000.00),
('Product B', 'Category 1', 2000.00),
('Product C', 'Category 2', 1500.00),
('Product D', 'Category 2', 2500.00)




In [0]:
%sql
select *, cast(sum(sales) over(partition by category) as string) as total_sales from sales_data

In [0]:
%sql
with salesrank as
(select product_name,category,sales ,dense_rank() over (partition by category order by sales desc) as rank  from sales_data)

select * from salesrank where rank=2

In [0]:
from collections import Counter

l1 = [1, 1, 2, 2, 3, 4, 4, 4, 5, 6]

result = dict(Counter(l1))
display(result)

In [0]:
from pyspark.sql import Window
from pyspark.sql.functions import row_number, col
data = [(1,), (2,), (4,), (5,), (-1,), (-2,), (-3,), (-4,)]

columns = ["colname"]

df = spark.createDataFrame(data, columns)
display(df)

df_positive = df.filter(col("colname") > 0).withColumnRenamed("colname", "positive_column")
df_negative = df.filter(col("colname") < 0).withColumnRenamed("colname", "negative_column")

df_output = df_positive.withColumn("id",row_number().over(Window.orderBy("positive_column"))).join(df_negative.withColumn("id",row_number().over(Window.orderBy("negative_column"))),"id").select("negative_column","positive_column")
display(df_output)