In [151]:
from pyspark.sql import SparkSession, Row
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, LongType, DateType, TimestampType
from pyspark.sql.functions import upper, pandas_udf, expr

from datetime import datetime, date
import pandas as pd

In [152]:
# import os, sys
# os.environ["PYSPARK_PYTHON"] = r"C:\Users\Lenovo\miniconda3\python.exe"
# os.environ["PYSPARK_DRIVER_PYTHON"] = r"C:\Users\Lenovo\miniconda3\python.exe"


# spark = SparkSession.builder.getOrCreate()

spark = (
    SparkSession.builder
    .appName("PandasToSparkTest")
    .master("local[*]")  # dùng toàn bộ CPU
    .config("spark.driver.bindAddress", "127.0.0.1")
    .config("spark.driver.host", "127.0.0.1")
    .config("spark.python.worker.reuse", "false")   # tránh lỗi worker treo
    .config("spark.network.timeout", "300s")
    .config("spark.executor.heartbeatInterval", "60s")
    .config("spark.sql.execution.arrow.pyspark.enabled", "true")
    .getOrCreate()
)

spark.sparkContext.setLogLevel("ERROR")
spark.conf.set("spark.sql.repl.eagerEval.enable", True)

In [153]:
df = spark.createDataFrame([
    Row(a=1, b=2., c="string1", d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
    Row(a=2, b=3., c="string2", d=date(2000, 1, 2), e=datetime(2000, 1, 2, 12, 0)),
    Row(a=3, b=4., c="string3", d=date(2000, 1, 3), e=datetime(2000, 3, 1, 12, 0)),
])

df


DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

In [154]:
data = [
    Row(a=1, b=2., c="string1", d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)),
    Row(a=2, b=3., c="string2", d=date(2000, 1, 2), e=datetime(2000, 1, 2, 12, 0)),
    Row(a=3, b=4., c="string3", d=date(2000, 1, 3), e=datetime(2000, 3, 1, 12, 0)),
]

schema = StructType([
    StructField("a", LongType(), True),
    StructField("b", DoubleType(), True),
    StructField("c", StringType(), True),
    StructField("d", DateType(), True),
    StructField("e", TimestampType(), True),
])

df = spark.createDataFrame(data=data, schema=schema)
df


DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

In [155]:
# Create spark DataFream from pandas

pd_df = pd.DataFrame({
    'a': [1, 2, 3],
    'b': [2., 3., 4.],
    'c': ['string1', 'string2', 'string3'],
    'd': [date(2000, 1, 1), date(2000, 2, 1), date(2000, 3, 1)],
    'e': [datetime(2000, 1, 1, 12, 0), datetime(2000, 1, 2, 12, 0), datetime(2000, 1, 3, 12, 0)]
})

df = spark.createDataFrame(pd_df)

df


DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

In [156]:
df.show()
df.show(1, vertical=True)
df.printSchema()
df.columns

print("Test new 1")
df.select(df.a, "b", "c").describe()
# df.select("a", "b", "c").describe().show()

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|
|  3|4.0|string3|2000-03-01|2000-01-03 12:00:00|
+---+---+-------+----------+-------------------+

-RECORD 0------------------
 a   | 1                   
 b   | 2.0                 
 c   | string1             
 d   | 2000-01-01          
 e   | 2000-01-01 12:00:00 
only showing top 1 row
root
 |-- a: long (nullable = true)
 |-- b: double (nullable = true)
 |-- c: string (nullable = true)
 |-- d: date (nullable = true)
 |-- e: timestamp (nullable = true)

Test new 1


DataFrame[summary: string, a: string, b: string, c: string]

In [157]:
df.limit(2).collect()


[Row(a=1, b=2.0, c='string1', d=datetime.date(2000, 1, 1), e=datetime.datetime(2000, 1, 1, 12, 0)),
 Row(a=2, b=3.0, c='string2', d=datetime.date(2000, 2, 1), e=datetime.datetime(2000, 1, 2, 12, 0))]

In [158]:
type(df.c.isNull())

pyspark.sql.classic.column.Column

In [159]:
df.withColumn('upper_c', upper(df.c)).filter(df.a == 1).show()

+---+---+-------+----------+-------------------+-------+
|  a|  b|      c|         d|                  e|upper_c|
+---+---+-------+----------+-------------------+-------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|STRING1|
+---+---+-------+----------+-------------------+-------+



In [160]:
@pandas_udf('long')
def pandas_plus_one(series: pd.Series) -> pd.Series:
    return series + 1

df.select(pandas_plus_one(df.a)).show()

+------------------+
|pandas_plus_one(a)|
+------------------+
|                 2|
|                 3|
|                 4|
+------------------+



                                                                                

In [161]:
def pandas_filter_func(iterator):
    for pandas_df in iterator:
        yield pandas_df[pandas_df.a == 1]

df.mapInPandas(pandas_filter_func, schema=df.schema)

DataFrame[a: bigint, b: double, c: string, d: date, e: timestamp]

In [162]:
dfn = spark.createDataFrame([
    ['red', 'banana', 1, 10], ['blue', 'banana', 2, 20], ['red', 'carrot', 3, 30],
    ['blue', 'grape', 4, 40], ['red', 'carrot', 5, 50], ['black', 'carrot', 6, 60],
    ['red', 'banana', 7, 70], ['red', 'grape', 8, 80]], schema=['color', 'fruit', 'v1', 'v2'])
dfn.show()

+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
|  red|banana|  1| 10|
| blue|banana|  2| 20|
|  red|carrot|  3| 30|
| blue| grape|  4| 40|
|  red|carrot|  5| 50|
|black|carrot|  6| 60|
|  red|banana|  7| 70|
|  red| grape|  8| 80|
+-----+------+---+---+



In [163]:
dfn.groupBy('color').avg().show()

+-----+-------+-------+
|color|avg(v1)|avg(v2)|
+-----+-------+-------+
|  red|    4.8|   48.0|
| blue|    3.0|   30.0|
|black|    6.0|   60.0|
+-----+-------+-------+



In [164]:
def plus_mean(pandas_df):
    return pandas_df.assign(v1 = pandas_df.v1 - pandas_df.v1.mean())

dfn.groupBy('color').applyInPandas(plus_mean, schema=dfn.schema).show()

+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
|black|carrot|  0| 60|
| blue|banana| -1| 20|
| blue| grape|  1| 40|
|  red|banana| -3| 10|
|  red|carrot| -1| 30|
|  red|carrot|  0| 50|
|  red|banana|  2| 70|
|  red| grape|  3| 80|
+-----+------+---+---+



In [165]:
df1 = spark.createDataFrame(
    [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
    ('time', 'id', 'v1'))

df2 = spark.createDataFrame(
    [(20000101, 1, 'x'), (20000101, 2, 'y')],
    ('time', 'id', 'v2'))

def merge_order(l, r):
    return pd.merge_ordered(l, r)

df1.groupBy('id').cogroup(df2.groupBy('id')).applyInPandas(merge_order, schema='time int, id int, v1 double, v2 string').show()

+--------+---+---+----+
|    time| id| v1|  v2|
+--------+---+---+----+
|20000101|  1|1.0|   x|
|20000102|  1|3.0|NULL|
|20000101|  2|2.0|   y|
|20000102|  2|4.0|NULL|
+--------+---+---+----+



In [166]:
df.write.csv("foo.csv", header=True)

AnalysisException: [PATH_ALREADY_EXISTS] Path file:/home/tmkhoa-1812/Projects/PyCharm/LearnPySpark/learn-spark-python/spark/foo.csv already exists. Set mode as "overwrite" to overwrite the existing path. SQLSTATE: 42K04

In [105]:
spark.read.csv("foo.csv", header=True).show()

+---+---+-------+----------+--------------------+
|  a|  b|      c|         d|                   e|
+---+---+-------+----------+--------------------+
|  1|2.0|string1|2000-01-01|2000-01-01T12:00:...|
|  2|3.0|string2|2000-02-01|2000-01-02T12:00:...|
|  3|4.0|string3|2000-03-01|2000-01-03T12:00:...|
+---+---+-------+----------+--------------------+



In [106]:
df.createOrReplaceTempView("tableA")
spark.sql("select count(*) from tableA").show()

+--------+
|count(1)|
+--------+
|       3|
+--------+



In [112]:
@pandas_udf("integer")
def add_one(s: pd.Series) -> pd.Series:
    return s + 1

spark.udf.register("add_one", add_one)
spark.sql("select add_one(a) from tableA").show()

+----------+
|add_one(a)|
+----------+
|         2|
|         3|
|         4|
+----------+



                                                                                

In [148]:
df.selectExpr('add_one(a)', 'b').show()
df.select(expr('count(*)') > 0).show()
df.show()

+----------+---+
|add_one(a)|  b|
+----------+---+
|         2|2.0|
|         3|3.0|
|         4|4.0|
+----------+---+

+--------------+
|(count(1) > 0)|
+--------------+
|          true|
+--------------+

+---+---+-------+----------+-------------------+
|  a|  b|      c|         d|                  e|
+---+---+-------+----------+-------------------+
|  1|2.0|string1|2000-01-01|2000-01-01 12:00:00|
|  2|3.0|string2|2000-02-01|2000-01-02 12:00:00|
|  3|4.0|string3|2000-03-01|2000-01-03 12:00:00|
+---+---+-------+----------+-------------------+



In [150]:
df.filter(expr("b > 2")).select(expr("a + 1 as a1"), expr("b as b1"), expr('c as c1')).show()

+---+---+-------+
| a1| b1|     c1|
+---+---+-------+
|  3|3.0|string2|
|  4|4.0|string3|
+---+---+-------+

