# Criminal Soccer
**Team:** Michael Gangl, Sebastian Grünewald, Patrick Leitner

## Topic
The goal of the project is to load and process various datasets from the soccer area and visualize the results. The project is a direct continuation of the Big Data Infrastructure project and to run and analyzed under Data Science aspects.
For this purpose web scraping, API requests, .csv-File handling, Kafka, Spark and data cleaning is used. Furthermore, the architecture is built in a way that all services and metadata are hosted and is therefore multiuser capable.

## Question
With our diagrams we want to analyze if there are correlations between the foul statistics of the players and the crime statistics and/or the gross domestic product (=Bruttoinlandsprodukt) of their countries of origin.
Additionally, some other interesting and relevant graphs about the datasets will be shown.

## Architecture <TBD>
[GitHub](https://github.com/sebi-gr/fh_bdeng_criminal_soccer)

MongoDB: mongodb://pt-n20.p4001.w3.cs.technikum-wien.at:4001</br>
MongoDB Database: `criminalSoccer`</br>
MongoDB Collections: `raw_players`, `players`, `countries`, `world_bank_gdp`</br>

Kafka: kafka:9092
Kafka Topic: `criminal_soccer_gdp

Spark: org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0
<br>

<img src="imgs/Architekturdiagramm.jpg"/>

## Spark/Kafka Visualization
<img src="imgs/Spark_Architektur.jpg"/>

## Data Collection
The following scripts load the data from various sources and stores it into MongoDB
### Datasets
[Kaggle UEFA (.csv-files)](https://www.kaggle.com/datasets/azminetoushikwasi/ucl-202122-uefa-champions-league): This dataset contains all the player stats of UEFA Champions League season 2021-22.

[Wikipedia National Crime Stats](https://en.wikipedia.org/wiki/List_of_countries_by_intentional_homicide_rate): Wikipedia provides a table with crime rates per 100.000 inhabitants.

[World Bank API](http://api.worldbank.org/v2/country/all/indicators/NY.GDP.MKTP.CD?format=json&mrnev=1&per_page=300): Gross Domestic Product from every country in the world (already filtered by non empty most recent values)

[Transfermarkt](https://www.transfermarkt.com/schnellsuche/keinergebnis/schnellsuche?query=): To complete the player stats, we used the Transfermarkt API to search for players by their name.

[Exchange Rates](s://v6.exchangerate-api.com'): Latest exchange rate from USD (because world bank response is in USD) to every currency in the world

### Technical Notice
**Make sure Docker is up and running!**
Due to historical problems with the hosted MongoDB instance from the FH, we implemented a backup strategy with Docker.

### Steps
1. Prepare local backup strategy for mongo with a docker container due to problems with the hosted mongo instance.
2. Download UEFA Dataset from kaggle and store the data in MongoDB.
3. Call the World Bank API with Kafka and store the responses in a topic. Then with Spark stream it into MognoDB.
4. Call the exchange rates API and window stream it with Spark and multiply the result with the field in MongoDB and store it.
5. Call the Transfermarkt API and scrape the resulting webpage for additional player infos.
6. Scrape Wikipedia to get crime statistics per country and store it in MongoDB.
7. Transform the data with proper datatypes and cleanup combined dataset.

#### Kaggle .csv dataset:
This dataset contains all the player stats of UEFA Champions League season 2021-22.

In [2]:
import pandas as pd
import zipfile
import os
import requests

# Importing necessary packages for mongodb connectivity
try:
    from pymongo import MongoClient
    from pymongo.errors import ServerSelectionTimeoutError
except ImportError:
    !pip install pymongo[srv]
    from pymongo import MongoClient
    from pymongo.errors import ServerSelectionTimeoutError

# Importing config from config.py
from config import MONGO_HOST_REMOTE, MONGO_DB_REMOTE, MONGO_HOST_LOCAL, MONGO_DB_LOCAL

# Defining constants for kaggle files
UEFA_ZIP = "kaggle_players_zip.zip"
UEFA_UNZIPPED = "kaggle_files"
UEFA_FILES = ["key_stats.csv", "disciplinary.csv", "distributon.csv", "defending.csv"] # only some .csv-files are interesting for our purposes
UEFA_RAW_DATA = "raw_players"

# Defining constants for MongoDB connection
conn_str = MONGO_HOST_REMOTE
mongoDB = MONGO_DB_REMOTE

class MongoContext:
    """mongodb client context manager"""
    def __init__(self):
        self.conn_str = MONGO_HOST_REMOTE
        self.mongoDB = MONGO_DB_REMOTE
    def __enter__(self):
        try:
            # print(f"conn_str: {conn_str}  mongoDB: {self.mongoDB}")
            # Connection to Mongo Server from FH-Technikum
            self.client = MongoClient(conn_str)
            self.client.server_info()
            #print("Connection successful to remote mongo host")
            return self.client
        # If connection is not possible, setting a local docker instance
        except ServerSelectionTimeoutError as err:
            print("Remote Error: " + str(err))
            os.system("docker pull mongo")
            os.system("docker run -d -p 27017:27017 mongo:latest")
            self.con_str = MONGO_HOST_LOCAL
            self.mongoDB = MONGO_DB_LOCAL
            try:
                # Trying to connect to the local docker Mongo database
                self.client = MongoClient(conn_str)
                self.client.server_info()
                #print("Connection successful to local mongo host")
                return self.client
            except ServerSelectionTimeoutError as errLocal:
                print("Local Error: " + str(errLocal))

    def __exit__(self, exception_type, exception_value, exception_traceback):
        self.client.close()
        del self.client

def unpack_zip(src, dest):
    """takes files in zip folder from src and extracts them to dest"""
    with zipfile.ZipFile(src, 'r') as zip_ref:
        zip_ref.extractall(dest)

def csv_to_mongo(folder, files, map_key):
    """Fetching data from interesting files in csv folder"""
    # kill existing collection if it exists:
    with MongoContext() as client:
        db = client[mongoDB]
        collection = db[UEFA_RAW_DATA]
        collection.drop()

        for idx, file in enumerate(files):
            df = pd.read_csv(f"{folder}/{file}")
            data = df.to_dict(orient='records')
            if idx == 0:
                # Insert data into mongo
                collection.insert_many(data)
            else:
                for row in data:
                    # Insert data into mongo
                    query = {map_key:  row[map_key]}
                    new_values = {"$set": row}
                    collection.update_one(query, new_values)

def read_from_mongo():
    """reading from mongo database and printing the collection"""
    with MongoContext() as client:
        db = client[mongoDB]
        collection = db[UEFA_RAW_DATA]

        data = collection.find()
        for x in data:
            print("==========================================================================")
            print(x)

def collect_from_kaggle():

    # guard, in case data is already in database.
    with MongoContext() as client:
        db = client[mongoDB]
        if UEFA_RAW_DATA in db.list_collection_names():
            print(f"{UEFA_RAW_DATA} is already in database")
            return False

    unpack_zip(UEFA_ZIP, UEFA_UNZIPPED)
    csv_to_mongo(UEFA_UNZIPPED, UEFA_FILES, "player_name")
    read_from_mongo()

collect_from_kaggle()

raw_players is already in database


False

#### WorldBank API with Kafka
To analyze the players stats against their home origin country and its gross domestic product, we fetch the corresponding data from the World Bank API via a Kafka Consumer.

In [1]:
import requests
from kafka import KafkaProducer
import json

# Configure Kafka producer
bootstrap_servers = 'kafka:9092'
topic = 'criminal_soccer_gdp'
producer = KafkaProducer(bootstrap_servers=bootstrap_servers,
                         value_serializer=lambda v: json.dumps(v).encode('utf-8'))

# Fetch data from API
api_url = 'http://api.worldbank.org/v2/country/all/indicators/NY.GDP.MKTP.CD?format=json&mrnev=1&per_page=300'
response = requests.get(api_url)
data = response.json()
data = data[1]

# Split data into smaller chunks
chunk_size = 1
chunks = [data[i:i + chunk_size] for i in range(0, len(data), chunk_size)]

# Publish each chunk to Kafka topic
for chunk in chunks:
    producer.send(topic, value=chunk)
    producer.flush()


# Close Kafka producer
producer.close()


NoBrokersAvailable: NoBrokersAvailable

In [37]:
# Optional: delete the topic
#from kafka import KafkaAdminClient
#admin_client = KafkaAdminClient(bootstrap_servers=bootstrap_servers)
#admin_client.delete_topics([topic])

DeleteTopicsResponse_v3(throttle_time_ms=0, topic_error_codes=[(topic='criminal_soccer_gdp', error_code=0)])

#### Spark Data Consumer (Normal Streaming)
After writing the messages to the Kafka topic we extract those with Spark (using streaming) and save it to our mongo database.

In [1]:
from pyspark.sql import SparkSession
from config import MONGO_HOST_REMOTE, MONGO_DB_REMOTE, KAFKA_TOPIC, KAFKA_SERVER
from pymongo import MongoClient
import json

# Initialize MongoDB client
mongo_client = MongoClient(MONGO_HOST_REMOTE)
mongo_collection = mongo_client[MONGO_DB_REMOTE]["world_bank_gdp"]

# Define a function to write DataFrame batches to MongoDB
def write_to_mongodb(batch_df, batch_id):
    documents = batch_df.selectExpr("CAST(value AS STRING)").collect()

    for row in documents:
        document = json.loads(row["value"])[0]
        mongo_collection.insert_one(document)


spark = SparkSession.builder.appName("CriminalSoccerStream").config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0").getOrCreate()

df = (spark.readStream                             # readStream indicates streaming
      .format("kafka")                                 # Specify the source format as "kafka"
      .option("kafka.bootstrap.servers", KAFKA_SERVER) # Configure the Kafka server name and port
      .option("subscribe", KAFKA_TOPIC)                # Subscribe to the Kafka topic
      .option("startingOffsets", "earliest")           # Rewind stream to beginning when we restart notebook
      .option("maxOffsetsPerTrigger", 1000)            # Throttle Kafka's processing of the streams
      .load()                                          # Load the DataFrame
      )

df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
    .writeStream \
    .foreachBatch(write_to_mongodb) \
    .start()

spark.streams.awaitAnyTermination()

Overwriting spark-consumer.py


#### Spark Streaming (Windowing)
To calculate the GDP into euros we fetch the current exchange rates and window stream it with Spark. The result gets multiplied with the already stored gdp field in MongoDB and stored separately.

##### Kafka Producer

In [None]:
import requests
from kafka import KafkaProducer
import json
import time
from datetime import datetime

# Kafka broker configuration
bootstrap_servers = 'kafka:9092'
topic = 'exchange_rates'

# API configuration
api_key = '232b9314dddc44450ad6db95'
api_url = 'https://v6.exchangerate-api.com/v6/'+api_key+'/latest/USD'

def fetch_api_data():
    try:
        response = requests.get(api_url)
        if response.status_code == 200:
            print(datetime.now().strftime("%d/%m/%Y %H:%M:%S") + ": Fetched Data!")
            return response.json()
        else:
            print(f"Failed to fetch API data. Status code: {response.status_code}")
            return None
    except requests.exceptions.RequestException as e:
        print(f"An error occurred while fetching API data: {str(e)}")
        return None

def main():
    # Configure Kafka producer
    producer = KafkaProducer(bootstrap_servers=bootstrap_servers,
                             value_serializer=lambda v: json.dumps(v).encode('utf-8'))

    fetch_interval = 6  # 6s
    num_fetches = 10     # = 1min

    for i in range(num_fetches):
        api_data = fetch_api_data()
        if api_data:
            producer.send(topic, value=api_data)
            producer.flush()
        time.sleep(fetch_interval)

    producer.flush()

if __name__ == '__main__':
    main()

##### Spark Streaming Service

In [1]:
from pyspark.sql import SparkSession

# attention: the .config line is specific for the aida-n2

# Spark session & context
spark = (SparkSession
         .builder
         .appName('exchange-rates-consumer')
         # Add kafka package (so that spark can find kafka for streaming)
         .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0")
         .getOrCreate())
sc = spark.sparkContext

In [2]:
# Create stream dataframe setting kafka server, topic and offset option
df = (spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "kafka:9092") # kafka server
      .option("subscribe", "exchange_rates") # topic
      .option("startingOffsets", "earliest") # start from beginning
      .load())

In [3]:
display(df)

DataFrame[key: binary, value: binary, topic: string, partition: int, offset: bigint, timestamp: timestamp, timestampType: int]

In [4]:
from pyspark.sql.types import StringType

# Convert binary to string key and value
df1 = (df
       .withColumn("key", df["key"].cast(StringType()))
       .withColumn("value", df["value"].cast(StringType())))

In [5]:
display(df1)

DataFrame[key: string, value: string, topic: string, partition: int, offset: bigint, timestamp: timestamp, timestampType: int]

In [6]:
from pyspark.sql.functions import from_json
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, LongType

schema_exchange_rates = StructType([
    StructField("result", StringType(), True),
    StructField("documentation", StringType(), True),
    StructField("terms_of_use", StringType(), True),
    StructField("time_last_update_unix", LongType(), True),
    StructField("time_last_update_utc", StringType(), True),
    StructField("time_next_update_unix", LongType(), True),
    StructField("time_next_update_utc", StringType(), True),
    StructField("base_code", StringType(), True),
    StructField("conversion_rates", StructType([
        StructField("USD", DoubleType(), True),
        StructField("AED", DoubleType(), True),
        StructField("AFN", DoubleType(), True),
        StructField("ALL", DoubleType(), True),
        StructField("AMD", DoubleType(), True),
        StructField("ANG", DoubleType(), True),
        StructField("AOA", DoubleType(), True),
        StructField("ARS", DoubleType(), True),
        StructField("AUD", DoubleType(), True),
        StructField("AWG", DoubleType(), True),
        StructField("AZN", DoubleType(), True),
        StructField("BAM", DoubleType(), True),
        StructField("BBD", DoubleType(), True),
        StructField("BDT", DoubleType(), True),
        StructField("BGN", DoubleType(), True),
        StructField("BHD", DoubleType(), True),
        StructField("BIF", DoubleType(), True),
        StructField("BMD", DoubleType(), True),
        StructField("BND", DoubleType(), True),
        StructField("BOB", DoubleType(), True),
        StructField("BRL", DoubleType(), True),
        StructField("BSD", DoubleType(), True),
        StructField("BTN", DoubleType(), True),
        StructField("BWP", DoubleType(), True),
        StructField("BYN", DoubleType(), True),
        StructField("BZD", DoubleType(), True),
        StructField("CAD", DoubleType(), True),
        StructField("CDF", DoubleType(), True),
        StructField("CHF", DoubleType(), True),
        StructField("CLP", DoubleType(), True),
        StructField("CNY", DoubleType(), True),
        StructField("COP", DoubleType(), True),
        StructField("CRC", DoubleType(), True),
        StructField("CUP", DoubleType(), True),
        StructField("CVE", DoubleType(), True),
        StructField("CZK", DoubleType(), True),
        StructField("DJF", DoubleType(), True),
        StructField("DKK", DoubleType(), True),
        StructField("DOP", DoubleType(), True),
        StructField("DZD", DoubleType(), True),
        StructField("EGP", DoubleType(), True),
        StructField("ERN", DoubleType(), True),
        StructField("ETB", DoubleType(), True),
        StructField("EUR", DoubleType(), True),
        StructField("FJD", DoubleType(), True),
        StructField("FKP", DoubleType(), True),
        StructField("FOK", DoubleType(), True),
        StructField("GBP", DoubleType(), True),
        StructField("GEL", DoubleType(), True),
        StructField("GGP", DoubleType(), True),
        StructField("GHS", DoubleType(), True),
        StructField("GIP", DoubleType(), True),
        StructField("GMD", DoubleType(), True),
        StructField("GNF", DoubleType(), True),
        StructField("GTQ", DoubleType(), True),
        StructField("GYD", DoubleType(), True),
        StructField("HKD", DoubleType(), True),
        StructField("HNL", DoubleType(), True),
        StructField("HRK", DoubleType(), True),
        StructField("HTG", DoubleType(), True),
        StructField("HUF", DoubleType(), True),
        StructField("IDR", DoubleType(), True),
        StructField("ILS", DoubleType(), True),
        StructField("IMP", DoubleType(), True),
        StructField("INR", DoubleType(), True),
        StructField("IQD", DoubleType(), True),
        StructField("IRR", DoubleType(), True),
        StructField("ISK", DoubleType(), True),
        StructField("JEP", DoubleType(), True),
        StructField("JMD", DoubleType(), True),
        StructField("JOD", DoubleType(), True),
        StructField("JPY", DoubleType(), True),
        StructField("KES", DoubleType(), True),
        StructField("KGS", DoubleType(), True),
        StructField("KHR", DoubleType(), True),
        StructField("KID", DoubleType(), True),
        StructField("KMF", DoubleType(), True),
        StructField("KRW", DoubleType(), True),
        StructField("KWD", DoubleType(), True),
        StructField("KYD", DoubleType(), True),
        StructField("KZT", DoubleType(), True),
        StructField("LAK", DoubleType(), True),
        StructField("LBP", DoubleType(), True),
        StructField("LKR", DoubleType(), True),
        StructField("LRD", DoubleType(), True),
        StructField("LSL", DoubleType(), True),
        StructField("LYD", DoubleType(), True),
        StructField("MAD", DoubleType(), True),
        StructField("MDL", DoubleType(), True),
        StructField("MGA", DoubleType(), True),
        StructField("MKD", DoubleType(), True),
        StructField("MMK", DoubleType(), True),
        StructField("MNT", DoubleType(), True),
        StructField("MOP", DoubleType(), True),
        StructField("MRU", DoubleType(), True),
        StructField("MUR", DoubleType(), True),
        StructField("MVR", DoubleType(), True),
        StructField("MWK", DoubleType(), True),
        StructField("MXN", DoubleType(), True),
        StructField("MYR", DoubleType(), True),
        StructField("MZN", DoubleType(), True),
        StructField("NAD", DoubleType(), True),
        StructField("NGN", DoubleType(), True),
        StructField("NIO", DoubleType(), True),
        StructField("NOK", DoubleType(), True),
        StructField("NPR", DoubleType(), True),
        StructField("NZD", DoubleType(), True),
        StructField("OMR", DoubleType(), True),
        StructField("PAB", DoubleType(), True),
        StructField("PEN", DoubleType(), True),
        StructField("PGK", DoubleType(), True),
        StructField("PHP", DoubleType(), True),
        StructField("PKR", DoubleType(), True),
        StructField("PLN", DoubleType(), True),
        StructField("PYG", DoubleType(), True),
        StructField("QAR", DoubleType(), True),
        StructField("RON", DoubleType(), True),
        StructField("RSD", DoubleType(), True),
        StructField("RUB", DoubleType(), True),
        StructField("RWF", DoubleType(), True),
        StructField("SAR", DoubleType(), True),
        StructField("SBD", DoubleType(), True),
        StructField("SCR", DoubleType(), True),
        StructField("SDG", DoubleType(), True),
        StructField("SEK", DoubleType(), True),
        StructField("SGD", DoubleType(), True),
        StructField("SHP", DoubleType(), True),
        StructField("SLE", DoubleType(), True),
        StructField("SLL", DoubleType(), True),
        StructField("SOS", DoubleType(), True),
        StructField("SRD", DoubleType(), True),
        StructField("SSP", DoubleType(), True),
        StructField("STN", DoubleType(), True),
        StructField("SYP", DoubleType(), True),
        StructField("SZL", DoubleType(), True),
        StructField("THB", DoubleType(), True),
        StructField("TJS", DoubleType(), True),
        StructField("TMT", DoubleType(), True),
        StructField("TND", DoubleType(), True),
        StructField("TOP", DoubleType(), True),
        StructField("TRY", DoubleType(), True),
        StructField("TTD", DoubleType(), True),
        StructField("TVD", DoubleType(), True),
        StructField("TWD", DoubleType(), True),
        StructField("TZS", DoubleType(), True),
        StructField("UAH", DoubleType(), True),
        StructField("UGX", DoubleType(), True),
        StructField("UYU", DoubleType(), True),
        StructField("UZS", DoubleType(), True),
        StructField("VES", DoubleType(), True),
        StructField("VND", DoubleType(), True),
        StructField("VUV", DoubleType(), True),
        StructField("WST", DoubleType(), True),
        StructField("XAF", DoubleType(), True),
        StructField("XCD", DoubleType(), True),
        StructField("XDR", DoubleType(), True),
        StructField("XOF", DoubleType(), True),
        StructField("XPF", DoubleType(), True),
        StructField("YER", DoubleType(), True),
        StructField("ZAR", DoubleType(), True),
        StructField("ZMW", DoubleType(), True),
        StructField("ZWL", DoubleType(), True)]))])

# Create dataframe setting schema for event data
df_exchange_rates = (df1
                     # Sets schema for event data
                     .withColumn("value", from_json("value", schema_exchange_rates))
                     )

In [7]:
display(df_exchange_rates)

DataFrame[key: string, value: struct<result:string,documentation:string,terms_of_use:string,time_last_update_unix:bigint,time_last_update_utc:string,time_next_update_unix:bigint,time_next_update_utc:string,base_code:string,conversion_rates:struct<USD:double,AED:double,AFN:double,ALL:double,AMD:double,ANG:double,AOA:double,ARS:double,AUD:double,AWG:double,AZN:double,BAM:double,BBD:double,BDT:double,BGN:double,BHD:double,BIF:double,BMD:double,BND:double,BOB:double,BRL:double,BSD:double,BTN:double,BWP:double,BYN:double,BZD:double,CAD:double,CDF:double,CHF:double,CLP:double,CNY:double,COP:double,CRC:double,CUP:double,CVE:double,CZK:double,DJF:double,DKK:double,DOP:double,DZD:double,EGP:double,ERN:double,ETB:double,EUR:double,FJD:double,FKP:double,FOK:double,GBP:double,GEL:double,GGP:double,GHS:double,GIP:double,GMD:double,GNF:double,GTQ:double,GYD:double,HKD:double,HNL:double,HRK:double,HTG:double,HUF:double,IDR:double,ILS:double,IMP:double,INR:double,IQD:double,IRR:double,ISK:double,JEP:d

In [8]:
windowDuration = "5 seconds"  # Window duration for aggregating data
slideDuration = "2 seconds"   # Sliding interval for window

In [16]:
from pyspark.sql.functions import window, avg
import pandas as pd
from config import MONGO_HOST_REMOTE, MONGO_DB_REMOTE, KAFKA_TOPIC, KAFKA_SERVER
from pymongo import MongoClient

# Initialize MongoDB client
mongo_client = MongoClient(MONGO_HOST_REMOTE)
mongo_collection = mongo_client[MONGO_DB_REMOTE]["world_bank_gdp"]

try:
    # Apply window function on the timestamp column with the specified window duration and sliding interval
    windowed_df = df_exchange_rates.withColumn("window", window("timestamp", windowDuration, slideDuration))

    # Perform aggregation within the window by calculating the average exchange rate for each currency
    aggregated_df = windowed_df.groupBy("window").agg(avg("value.conversion_rates.EUR").alias("average_exchange_rate_EUR"))

    def add_to_mongo(batch_df, batch_id):
        # Retrieve the documents from the collection
        documents = mongo_collection.find()

        # Iterate over each document
        for document in documents:
            # Get the value of the existing field
            existing_value = document["value"]
            
            val = batch_df.select("average_exchange_rate_EUR").first()

            average_exchange_rate_eur = val["average_exchange_rate_EUR"]

            # Calculate the new value
            gdp_eur = existing_value * average_exchange_rate_eur

            # Update the document with the new field and value
            mongo_collection.update_one(
                {"_id": document["_id"]},
                {"$set": {"gdp_eur": gdp_eur}}
            )

        # Close the MongoDB connection
        client.close()


    # Start the streaming query to process the data
    query = (aggregated_df
             .writeStream
             .outputMode("update")  # Use Update mode for aggregation
             .foreachBatch(add_to_mongo)  # Call the add_to_mongo function for each batch
             .option("checkpointLocation", "checkpoints")  # Specify the checkpoint directory path
             .start())

    # Wait for the streaming query to finish processing the data
    query.awaitTermination()

except KeyboardInterrupt:
    print("Process terminated")

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/usr/local/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/opt/conda/lib/python3.10/socket.py", line 705, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


Process terminated


#### Transfermarkt API and Crawler
<img src="imgs/Transfermarkt_logo.png" style="width:200px; height:auto"/>

In Order to collect insights on players nationality we are querying the Transfermarkt [website](https://transfermarkt.com). We then parse the html response to collect `full_name`, `nationality`, `icon` and `market_value` from it.

In [3]:
try:
    import parsel
except ImportError:
    !pip install parsel
    import parsel


def transfermarkt_spider(name):
    """queries transfermarkt.com and parses response table"""
    ua = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/112.0.0.0 Safari/537.36"
    header = {"user-agent": ua}
    result = None
    with requests.Session() as session:
        url = "https://www.transfermarkt.com/schnellsuche/ergebnis/schnellsuche"
        req = session.get(url, params={"query": name}, headers=header) # API call
        response = parsel.Selector(req.text)
        try:
            # retrieve data from table
            row = response.xpath("//table[@class='items']/tbody/tr[1]")
            icon_url = row.xpath(".//table//img/@src").get()
            name = row.xpath(".//table//img/@title").get()
            national = row.xpath("./td[5]/img[1]/@alt").get()
            value = row.xpath("./td[6]/text()").get()
            result = dict(icon=icon_url, full_name=name, nationality=national, market_value=value)
        except:
            pass
    return result

# Collect from Transfermarkt.com
with MongoContext() as client:
    # MongoDB connection
    db = client[mongoDB]
    collection = db[UEFA_RAW_DATA]
    raw = collection.find()
    count = 0

    # add the complementary data to the mongo documents
    mongo_rows = collection.find({ "nationality": { "$exists":False }})
    for mongo_player in mongo_rows:
        print(mongo_player["player_name"])
        if not mongo_player.get("nationality"):
            name = mongo_player["player_name"]
            transfer_data = transfermarkt_spider(name)
            if transfer_data:
                print(transfer_data)
                collection.update_one({"player_name": name}, {"$set": transfer_data})

Courtois
{'icon': 'https://img.a.transfermarkt.technology/portrait/small/108390-1665067957.jpg?lm=1', 'full_name': 'Thibaut Courtois', 'nationality': 'Belgium', 'market_value': '€45.00m'}
Vinícius Júnior
{'icon': 'https://img.a.transfermarkt.technology/portrait/small/371998-1664869583.jpg?lm=1', 'full_name': 'Vinicius Junior', 'nationality': 'Brazil', 'market_value': '€150.00m'}
Benzema
{'icon': 'https://img.a.transfermarkt.technology/portrait/small/18922-1653042225.jpg?lm=1', 'full_name': 'Karim Benzema', 'nationality': 'France', 'market_value': '€25.00m'}
Modrić
{'icon': 'https://img.a.transfermarkt.technology/portrait/small/27992-1661426133.jpg?lm=1', 'full_name': 'Luka Modric', 'nationality': 'Croatia', 'market_value': '€10.00m'}
Éder Militão
{'icon': 'https://img.a.transfermarkt.technology/portrait/small/401530-1568189259.jpg?lm=1', 'full_name': 'Éder Militão', 'nationality': 'Brazil', 'market_value': '€70.00m'}
Alaba
{'icon': 'https://img.a.transfermarkt.technology/portrait/small

#### Fetch crime stats from Wikipedia:
Download crime stats (intentional homicides) for all countries in the world (0.0-10.0)


In [4]:
import parsel


def crime_from_wiki():
    """scrape from wikipedia and yield results"""
    url = "https://en.wikipedia.org/wiki/List_of_countries_by_intentional_homicide_rate"
    with requests.Session() as session:
        req = session.get(url)
        response = parsel.Selector(req.text)
        table = response.xpath("//table[contains(@class,'static-row-numbers')]")
        body = table.xpath("./tbody//tr")
        for row in body:
            country = row.xpath("./td[1]//a/text()").get()
            if country:
                country = country.strip("*")
                country = country.strip()
                count_p_100k = float(row.xpath("./td[4]/text()").get())
                yield {"country":country, "count_p_100k":count_p_100k}

# Collect from Wikipedia
with MongoContext() as client:
    db= client[mongoDB]
    collection = db["countries"]
    collection.drop()
    for country in crime_from_wiki():
        collection.insert_one(country)

    for x in collection.find():
        print(x)

{'_id': ObjectId('648f35f6cedd83ded33bc567'), 'country': 'Afghanistan', 'count_p_100k': 6.7}
{'_id': ObjectId('648f35f6cedd83ded33bc568'), 'country': 'Albania', 'count_p_100k': 2.1}
{'_id': ObjectId('648f35f6cedd83ded33bc569'), 'country': 'Algeria', 'count_p_100k': 1.3}
{'_id': ObjectId('648f35f6cedd83ded33bc56a'), 'country': 'Andorra', 'count_p_100k': 2.6}
{'_id': ObjectId('648f35f6cedd83ded33bc56b'), 'country': 'Angola', 'count_p_100k': 4.8}
{'_id': ObjectId('648f35f6cedd83ded33bc56c'), 'country': 'Anguilla', 'count_p_100k': 28.3}
{'_id': ObjectId('648f35f6cedd83ded33bc56d'), 'country': 'Antigua and Barbuda', 'count_p_100k': 9.2}
{'_id': ObjectId('648f35f6cedd83ded33bc56e'), 'country': 'Argentina', 'count_p_100k': 5.3}
{'_id': ObjectId('648f35f6cedd83ded33bc56f'), 'country': 'Armenia', 'count_p_100k': 1.8}
{'_id': ObjectId('648f35f6cedd83ded33bc570'), 'country': 'Aruba', 'count_p_100k': 1.9}
{'_id': ObjectId('648f35f6cedd83ded33bc571'), 'country': 'Australia', 'count_p_100k': 0.9}
{'

## Transform Data
In order to analyze the data we apply type conversions on selected attributes

In [5]:
TYPE_CONVERSIONS = {"minutes_played": "int",
                    'match_played': "int", 'goals': "int", 'assists': "int", 'distance_covered': 'float',
                    'fouls_committed': "int", 'fouls_suffered': "int", 'red': "int", 'yellow': "int",
                    'cross_accuracy': "int", 'cross_attempted': "int", 'cross_complted': "int",
                    'freekicks_taken': "int", 'pass_accuracy': "float", 'pass_attempted': "int",
                    'pass_completed': "int",
                    'balls_recoverd': "int",
                    'clearance_attempted': "int",
                    't_lost': "int",
                    't_won': "int",
                    'tackles': "int"
                    }

def type_converter(item: dict, definitions) -> dict:
    """converts all values that are in a given type key"""
    new_item = dict()
    for k, v in item.items():
        if k in definitions:
            if definitions[k] == "int":
                try:
                    float(v)
                    v = int(v)
                except:
                    v = None
            elif definitions[k] == "float":
                try:
                    v = float(v)
                except:
                    v = None

        new_item[k] = v
    return new_item

def transform_raw_data():
    with MongoContext() as client:
        db = client[mongoDB]
        raw = db[UEFA_RAW_DATA]
        collection = db["players"]
        collection.drop()
        for doc in raw.find():
            cleaned_item = type_converter(doc, TYPE_CONVERSIONS)
            collection.insert_one(cleaned_item)

transform_raw_data()

### Metadata in MongoDB
#### Collections
##### Players:
Example data entry from a soccer player:

```{'_id': ObjectId('643c1b35afd7d39c21e6dbf8'),'player_name': 'Courtois', 'club': 'Real Madrid', 'position': 'Goalkeeper', 'minutes_played': 1230, 'match_played': 13, 'goals': 0, 'assists': 0, 'distance_covered': 64.2, 'cross_accuracy': 0, 'cross_attempted': 0, 'cross_complted': 0, 'freekicks_taken': 27, 'pass_accuracy': 76.7, 'pass_attempted': 483, 'pass_completed': 365, 'serial': 447, 'full_name': 'Thibaut Courtois', 'icon': 'https://img.a.transfermarkt.technology/portrait/small/108390-1665067957.jpg?lm=1', 'market_value': '€60.00m', 'nationality': 'Belgium'}```

##### Countries:
Example data entry for the crime stats:

`{'_id': ObjectId('643c37c7afd7d39c21e6dfc0'), 'country': 'Afghanistan', 'count_p_100k': 6.7}`

##### World Bank GDP
Example data entry of the World Bank Data:
`{
  'indicator': {
    'id': 'NY.GDP.MKTP.CD',
    'value': 'GDP (current US$)'
  },
  'country': {
    'id': 'ZH',
    'value': 'Africa Eastern and Southern'
  },
  'countryiso3code': 'AFE',
  'date': '2021',
  'value': 1089454323810.36,
  'obs_status': '',
  'decimal': 0,
  'gdp_eur': 1000772741852.1967
}`

## Data Cleaning
Data Cleaning of `world_bank_gdp` Mongo-Collection is not necessary due to the API arguments which already make sure non-empty values are filtered.
<>

## Data Analysis
<>

## Data Output
<>