In [1]:
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q https://dlcdn.apache.org/spark/spark-3.3.1/spark-3.3.1-bin-hadoop3.tgz

In [2]:
!tar xf spark-3.3.1-bin-hadoop3.tgz

In [3]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.1-bin-hadoop3"

In [4]:
!pip install -q findspark

In [5]:
import findspark

findspark.init()

In [6]:
import multiprocessing

import pyspark

cfg = (
    pyspark.SparkConf()
    # Ustawienie mastera, aby działał lokalnie iz maksymalną liczbą rdzeni procesora do wieloprocesorowości.
    .setMaster(f"local[{multiprocessing.cpu_count()}]")
    # Ustawianie nazwy aplikacji
    .setAppName("TestApp")
    # Ustawienie wartości konfiguracji za pomocą ciągu znaków
    .set("spark.eventLog.enabled", False)
    # Ustawianie zmiennych środowiskowych dla executorów do użycia
    .setExecutorEnv(pairs=[("VAR3", "value3"), ("VAR4", "value4")])
    # Ustawienie pamięci, jeśli to ustawienie nie zostało wcześniej zrobione
    .setIfMissing("spark.executor.memory", "1g")
)

# Pobieranie pojedynczej zmiennej
print(cfg.get("spark.executor.memory"))
# Lista wszystkich z nich w formacie czytelnego napisu (stringa)
print(cfg.toDebugString())

1g
spark.master=local[2]
spark.app.name=TestApp
spark.eventLog.enabled=False
spark.executorEnv.VAR3=value3
spark.executorEnv.VAR4=value4
spark.executor.memory=1g


In [84]:
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql import functions as func


spark = SparkSession.builder.appName("SparkSQL_Total_By_Customer").getOrCreate()


In [26]:
def mapper(line):
    fields = line.split(',')
    return Row(client=int(fields[0]), strange=int(fields[1]), spent=float(fields[2]))

In [27]:
lines = spark.sparkContext.textFile("customer-orders.csv")

In [86]:
customers = lines.map(mapper)


In [89]:
schemaCustomers = spark.createDataFrame(customers).cache()
schemaCustomers.createOrReplaceTempView("customers")
schemaCustomers1 = schemaCustomers.select("client","spent")


In [90]:
schemaCustomers1.groupBy("client").agg(func.round(func.sum("spent"),2).alias("spentby")).show()


+------+-------+
|client|spentby|
+------+-------+
|    29|5032.53|
|    26| 5250.4|
|    65|5140.35|
|    54|6065.39|
|    19|5059.43|
|     0|5524.95|
|    22|5019.45|
|     7|4755.07|
|    77|4327.73|
|    34| 5330.8|
|    50|4517.27|
|    94|4475.57|
|    57| 4628.4|
|    43|5368.83|
|    32|5496.05|
|    84|4652.94|
|    31|4765.05|
|    98|4297.26|
|    39|6193.11|
|    25|5057.61|
+------+-------+
only showing top 20 rows



In [92]:
schemaCustomers1.groupBy("client").agg(func.round(func.sum("spent"),2).alias("spentby")).sort("spentby").show()


+------+-------+
|client|spentby|
+------+-------+
|    45|3309.38|
|    79|3790.57|
|    96|3924.23|
|    23|4042.65|
|    99|4172.29|
|    75| 4178.5|
|    36|4278.05|
|    98|4297.26|
|    47| 4316.3|
|    77|4327.73|
|    13|4367.62|
|    48|4384.33|
|    49| 4394.6|
|    94|4475.57|
|    67|4505.79|
|    50|4517.27|
|    78|4524.51|
|     5|4561.07|
|    57| 4628.4|
|    83| 4635.8|
+------+-------+
only showing top 20 rows

