In [8]:
for str in ["Python", "Jupyter", "Spark"]:
    print(f"Hello {str}")

Hello Python
Hello Jupyter
Hello Spark


# Quickstart: DataFrame
## [Grouping Data](https://spark.apache.org/docs/latest/api/python/getting_started/quickstart_df.html#Grouping-Data)   
PySpark DataFrame also provides a way of handling grouped data by using the common approach, split-apply-combine strategy. It groups the data by a certain condition applies a function to each group and then combines them back to the DataFrame.

In [11]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame([
    ['red', 'banana', 1, 10], ['blue', 'banana', 2, 20], ['red', 'carrot', 3, 30],
    ['blue', 'grape', 4, 40], ['red', 'carrot', 5, 50], ['black', 'carrot', 6, 60],
    ['red', 'banana', 7, 70], ['red', 'grape', 8, 80]], schema=['color', 'fruit', 'v1', 'v2'])
df.show()

+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
|  red|banana|  1| 10|
| blue|banana|  2| 20|
|  red|carrot|  3| 30|
| blue| grape|  4| 40|
|  red|carrot|  5| 50|
|black|carrot|  6| 60|
|  red|banana|  7| 70|
|  red| grape|  8| 80|
+-----+------+---+---+



Grouping and then applying the `avg()` function to the resulting groups.

In [9]:
df.groupby('color').avg().show()

[Stage 9:>                                                        (0 + 16) / 16]

+-----+-------+-------+
|color|avg(v1)|avg(v2)|
+-----+-------+-------+
|  red|    4.8|   48.0|
| blue|    3.0|   30.0|
|black|    6.0|   60.0|
+-----+-------+-------+



                                                                                

You can also apply a Python native function against each group by using pandas API.

In [10]:
def plus_mean(pandas_df):
    return pandas_df.assign(v1=pandas_df.v1 - pandas_df.v1.mean())

df.groupby('color').applyInPandas(plus_mean, schema=df.schema).show()

[Stage 14:>                                                         (0 + 1) / 1]

+-----+------+---+---+
|color| fruit| v1| v2|
+-----+------+---+---+
|black|carrot|  0| 60|
| blue|banana| -1| 20|
| blue| grape|  1| 40|
|  red|banana| -3| 10|
|  red|carrot| -1| 30|
|  red|carrot|  0| 50|
|  red|banana|  2| 70|
|  red| grape|  3| 80|
+-----+------+---+---+



                                                                                

## [Getting Data In/Out](https://spark.apache.org/docs/latest/api/python/getting_started/quickstart_df.html#Grouping-Data)  
CSV is straightforward and easy to use. Parquet and ORC are efficient and compact file formats to read and write faster.

In [79]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
df = spark.read.parquet('data/date=2009-01-03/part-00000-bdd84ab2-82e9-4a79-8212-7accd76815e8-c000.snappy.parquet')

df.show(truncate=False)

+----------------------------------------------------------------+-------+----+----------------------------------------------------------------+------------+------------+---------+-----------+------------+-----------+------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------+----------+
|hash                                                            |version|size|block_hash                                                      |block_number|virtual_size|lock_time|input_count|output_count|is_coinbase|output_value|outputs                                                                                                                                                                                                      

### Load Multiple Parquet Files.

In [3]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
df = spark.read.parquet("data/*")
df.select("hash", "inputs", "outputs").show(3, truncate=False)

+----------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [4]:
df.select("hash", "inputs", "outputs").printSchema()

root
 |-- hash: string (nullable = true)
 |-- inputs: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- address: string (nullable = true)
 |    |    |-- index: long (nullable = true)
 |    |    |-- required_signatures: long (nullable = true)
 |    |    |-- script_asm: string (nullable = true)
 |    |    |-- script_hex: string (nullable = true)
 |    |    |-- sequence: long (nullable = true)
 |    |    |-- spent_output_index: long (nullable = true)
 |    |    |-- spent_transaction_hash: string (nullable = true)
 |    |    |-- txinwitness: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- type: string (nullable = true)
 |    |    |-- value: double (nullable = true)
 |-- outputs: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- address: string (nullable = true)
 |    |    |-- index: long (nullable = true)
 |    |    |-- required_signatures: long (nullable = true)
 |   

In [5]:
from pyspark.sql.functions import explode, col, format_number

# Extract sender addresses from inputs
inputs_df = df.select(
    col("hash"),
    col("block_timestamp"),
    col("date"),
    explode("inputs").alias("input")
).select(
    "hash",
    "block_timestamp",
    "date",
    col("input.address").alias("from_address")  # Changed 'addresses' to 'address'
)

# Extract recipient addresses and amounts from outputs
outputs_df = df.select(
    col("hash"),
    explode("outputs").alias("output")
).select(
    "hash",
    col("output.address").alias("to_address"),  # Changed 'addresses' to 'address'
    col("output.value").alias("amount")
)

# Join inputs_df and outputs_df based on transaction_hash
tx_df = inputs_df.join(outputs_df, "hash")
tx_df = tx_df.withColumn("amount", format_number("amount", 8))

# Display the result
l = ["4f297d691eb07c559705d00fefae7903f3adb4228a751c5515d4850deb790978", "100c011029401366296b91ebb666f8e97cbac0e9e5e93c0122a506198ce75c60"]
tx_df.filter(tx_df.hash.isin(l)).show(truncate=False)



+----------------------------------------------------------------+-------------------+----------+------------------------------------------+------------------------------------------+----------+
|hash                                                            |block_timestamp    |date      |from_address                              |to_address                                |amount    |
+----------------------------------------------------------------+-------------------+----------+------------------------------------------+------------------------------------------+----------+
|100c011029401366296b91ebb666f8e97cbac0e9e5e93c0122a506198ce75c60|2024-02-09 03:42:39|2024-02-08|3Fc8Mm4xeCHkDnvJgxLZue4cJH7KnphzDp        |bc1qrvp9w4rgtnt9a03aqcafqp29j3js22q9nudxjn|0.00221300|
|100c011029401366296b91ebb666f8e97cbac0e9e5e93c0122a506198ce75c60|2024-02-09 03:42:39|2024-02-08|3Fc8Mm4xeCHkDnvJgxLZue4cJH7KnphzDp        |3Qwm9ENNQW8P4Zzwy3G4oJwc5Wdq17eU8o        |0.00641522|
|100c011029401366296b91eb

                                                                                