##  PySpark
> Ivan Valdes

In [0]:
# Import libraries
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark.sql.functions import lit, sum, col, expr, concat_ws

### 1. Create a DataFrame with customer's data, including name, age, and city of residence.

In [0]:
# Data
df=  [("Alice", 25, "New York"),
        ("Bob", 30, "Los Angeles"),
        ("Charlie", 22, "Chicago")]

# Define the schema with column names
schema = StructType([
    StructField("Name", StringType(), True),
    StructField("Age", IntegerType(), True),
    StructField("City", StringType(), True)
])

# Create DataFrame
dfCustomer=spark.createDataFrame(df, schema=schema)
display(dfCustomer)

Name,Age,City
Alice,25,New York
Bob,30,Los Angeles
Charlie,22,Chicago


### 2. Display only the client names in the DataFrame.

In [0]:
# We transform the data, keeping only the name
dfCustomerSelect = (dfCustomer
  .select("Name") 
)

# Display results
dfCustomerSelect.show()

+-------+
|   Name|
+-------+
|  Alice|
|    Bob|
|Charlie|
+-------+



### 3. Filter clients whose age is greater than or equal to 25.

In [0]:
# Age filter
dfCustomerOlder25 = dfCustomer.filter(dfCustomer["Age"] >= 25)

# Display results
dfCustomerOlder25.show()

+-----+---+-----------+
| Name|Age|       City|
+-----+---+-----------+
|Alice| 25|   New York|
|  Bob| 30|Los Angeles|
+-----+---+-----------+



### 4. Add a new column "Country" with a constant value for all rows.

In [0]:
# Add a new column
dfCustomerWithCountry = dfCustomer.withColumn("Country", lit("EEUU"))

# Display results
dfCustomerWithCountry.show()

+-------+---+-----------+-------+
|   Name|Age|       City|Country|
+-------+---+-----------+-------+
|  Alice| 25|   New York|   EEUU|
|    Bob| 30|Los Angeles|   EEUU|
|Charlie| 22|    Chicago|   EEUU|
+-------+---+-----------+-------+



### 5. Calculate the average age.

In [0]:
# Average
average_age = dfCustomer.agg({"Age": "avg"}).collect()[0][0]

# Display results
print("Average age:", round(average_age,1))

Average age: 25.7


### 6. Sort the DataFrame by age in descending order.

In [0]:
# Descending order
dfOrderedDesc = dfCustomer.orderBy("Age", ascending=False)

# Display results
dfOrderedDesc.show()

+-------+---+-----------+
|   Name|Age|       City|
+-------+---+-----------+
|    Bob| 30|Los Angeles|
|  Alice| 25|   New York|
|Charlie| 22|    Chicago|
+-------+---+-----------+



### 7. Group by city and calculate the count of people in each city.

In [0]:
# Add more customers
new_data = [("Frank", 40, "Chicago"),
            ("Grace", 28, "New York")]
dfNewData = spark.createDataFrame(new_data, schema=schema)

# Add the new rows to the existing DataFrame using union.
dfCustomerGroupByCity = dfCustomer.union(dfNewData)

# Group by city and calculate the count the clients in each city
dfCustomerGroupByCity = dfCustomerGroupByCity.groupBy("City").count()

# Display results
print("Number of customers in each city:")
dfCustomerGroupByCity.show()

Number of customers in each city:
+-----------+-----+
|       City|count|
+-----------+-----+
|   New York|    2|
|Los Angeles|    1|
|    Chicago|    2|
+-----------+-----+



### 8. Rename the column "Name" to "Full_Name."

In [0]:
# Change name of the column "Name" to "Full_Name"
dfCustomerFullName = dfCustomer.withColumnRenamed("Name", "Full_Name")

# Display results
dfCustomerFullName.show()

+---------+---+-----------+
|Full_Name|Age|       City|
+---------+---+-----------+
|    Alice| 25|   New York|
|      Bob| 30|Los Angeles|
|  Charlie| 22|    Chicago|
+---------+---+-----------+



### 9. Drop the "Age" column from the DataFrame.

In [0]:
# Drop column
dfCustomerDropAge = dfCustomer.drop("Age")

# Display results
dfCustomerDropAge.show()

+-------+-----------+
|   Name|       City|
+-------+-----------+
|  Alice|   New York|
|    Bob|Los Angeles|
|Charlie|    Chicago|
+-------+-----------+



### 10. Perform a SQL query on the DataFrame to select customers older than 20 years.

In [0]:
# Add more customers
new_data = [("Frank", 18, "Chicago"),
            ("Grace", 17, "New York")]
dfNewData = spark.createDataFrame(new_data, schema=schema)

# Add the new rows to the existing DataFrame using union
dfCustomerOlder20 = dfCustomer.union(dfNewData)

# Perform the SQL query to select people over 20 years old
query = "SELECT * FROM older20 WHERE Age > 20"

# Register the DataFrame as a temporary table
dfCustomerOlder20.createOrReplaceTempView("older20")

# Perform SQL
result = spark.sql(query)

# Display results
result.show()

+-------+---+-----------+
|   Name|Age|       City|
+-------+---+-----------+
|  Alice| 25|   New York|
|    Bob| 30|Los Angeles|
|Charlie| 22|    Chicago|
+-------+---+-----------+



### 11. Calculate the total sum of all ages.

In [0]:
# Function sum()
ageSum = dfCustomer.select(sum(dfCustomer.Age))

# Display results
ageSum.show()

+--------+
|sum(Age)|
+--------+
|      77|
+--------+



### 12. Calculate the minimum and maximum age of all clients.

In [0]:
# Minimum
min = dfCustomer.agg({"Age": "min"}).collect()[0][0]
print ('Minimum age:', min)

# Maximun
max = dfCustomer.agg({"Age": "max"}).collect()[0][0]
print ('Maximum age:', max)

Minimum age: 22
Maximum age: 30


### 13. Filter clients whose city of residence is "Chicago" and age is less than 30.

In [0]:
# Filter
df_chicago_30 = (dfCustomer
.filter(col("City") == "Chicago")
.filter(col("Age") < 30)
)

# Display results
df_chicago_30.show()

+-------+---+-------+
|   Name|Age|   City|
+-------+---+-------+
|Charlie| 22|Chicago|
+-------+---+-------+



### 14. Add a new column "DoubledAge" that contains twice the value of the age.

In [0]:
# Add column
dfCustomerDoubledAge = dfCustomer.withColumn("Doubled_Age", dfCustomer["Age"] * 2)

# Display results
dfCustomerDoubledAge.show()

+-------+---+-----------+-----------+
|   Name|Age|       City|Doubled_Age|
+-------+---+-----------+-----------+
|  Alice| 25|   New York|         50|
|    Bob| 30|Los Angeles|         60|
|Charlie| 22|    Chicago|         44|
+-------+---+-----------+-----------+



### 15. Convert all ages from years to months.

In [0]:
# Addign a new column
dfCustomerAgeInMonths = dfCustomer.withColumn("Age_In_Months", dfCustomer["Age"] * 12)

# Display results
dfCustomerAgeInMonths.show()

# Editing Age column
dfCustomerAgeInMonths1 = dfCustomer.withColumn("Age", dfCustomer["Age"] * 12)

# Display results
dfCustomerAgeInMonths1.show()

+-------+---+-----------+-------------+
|   Name|Age|       City|Age_In_Months|
+-------+---+-----------+-------------+
|  Alice| 25|   New York|          300|
|    Bob| 30|Los Angeles|          360|
|Charlie| 22|    Chicago|          264|
+-------+---+-----------+-------------+

+-------+---+-----------+
|   Name|Age|       City|
+-------+---+-----------+
|  Alice|300|   New York|
|    Bob|360|Los Angeles|
|Charlie|264|    Chicago|
+-------+---+-----------+



### 16. Count the total number of clients in the DataFrame.

In [0]:
print('Total clients: ',dfCustomer.count())

Total clients:  3


### 17. Filter clients whose age is an even number.

In [0]:
# Filter
dfEven = dfCustomer.filter((dfCustomer["Age"] % 2) == 0)

# Display results
dfEven.show()

+-------+---+-----------+
|   Name|Age|       City|
+-------+---+-----------+
|    Bob| 30|Los Angeles|
|Charlie| 22|    Chicago|
+-------+---+-----------+



### 18. Calculate the count of clients per age range (0-20, 21-40, 41-60, 61+).

In [0]:
# Add more customers
new_data = [("Frank", 5, "Chicago"),
            ("Grace", 22, "New York"),
            ("Ivan", 45, "Miami"),
            ("Jose", 81, "Tampa")
            ]
dfNewData = spark.createDataFrame(new_data, schema=schema)

# Add the new rows to the existing DataFrame using union
dfCustomerByAgeRange = dfCustomer.union(dfNewData)

# Create a new column with age range
dfCustomerByAgeRange = dfCustomerByAgeRange.withColumn("AgeRange", expr(
    "CASE WHEN Age <= 20 THEN '0-20' " +
    "WHEN Age <= 40 THEN '21-40' " +
    "WHEN Age <= 60 THEN '41-60' " +
    "ELSE '61+' END"
))

# Display results
dfCustomerByAgeRange.show()

# Count by age range
result = dfCustomerByAgeRange.groupBy("AgeRange").count()

# Display results
result.show()


+-------+---+-----------+--------+
|   Name|Age|       City|AgeRange|
+-------+---+-----------+--------+
|  Alice| 25|   New York|   21-40|
|    Bob| 30|Los Angeles|   21-40|
|Charlie| 22|    Chicago|   21-40|
|  Frank|  5|    Chicago|    0-20|
|  Grace| 22|   New York|   21-40|
|   Ivan| 45|      Miami|   41-60|
|   Jose| 81|      Tampa|     61+|
+-------+---+-----------+--------+

+--------+-----+
|AgeRange|count|
+--------+-----+
|   21-40|    4|
|    0-20|    1|
|   41-60|    1|
|     61+|    1|
+--------+-----+



### 19. Count how many clients share the same name.

In [0]:
# Add more customers
new_data = [("Alice", 5, "Chicago")]
dfNewData = spark.createDataFrame(new_data, schema=schema)

# Add the new rows to the existing DataFrame using union
resultNameDuplicated = dfCustomer.union(dfNewData)

# Group by
resultNameDuplicated = resultNameDuplicated.groupBy("Name").count()

# Display results
resultNameDuplicated.show()

+-------+-----+
|   Name|count|
+-------+-----+
|  Alice|    2|
|    Bob|    1|
|Charlie|    1|
+-------+-----+



### 20. Concatenate the "Name" and "Age" columns into a new column called "PersonalInformation."

In [0]:
# Concatenate Columns
dfCustomerNameCityJoined = dfCustomer.withColumn("Personal_Information", concat_ws(", ", dfCustomer["Name"], dfCustomer["City"]))

# Display results
dfCustomerNameCityJoined.show()

+-------+---+-----------+--------------------+
|   Name|Age|       City|Personal_Information|
+-------+---+-----------+--------------------+
|  Alice| 25|   New York|     Alice, New York|
|    Bob| 30|Los Angeles|    Bob, Los Angeles|
|Charlie| 22|    Chicago|    Charlie, Chicago|
+-------+---+-----------+--------------------+

