In [12]:
# Problem 1 - Bubble sort
# def bubble_sort(sequence):
#     # Write your bubble sort code here.
#     return sequence

# assert bubble_sort([5, 1, 3, 2, 4]) == [1, 2, 3, 4, 5]

###############################################################

"""Answer to Problem 1 - Bubble sort"""
def bubble_sort(sequence):
    # Write your bubble sort code here.
    while True:
        for i, n in enumerate(sequence[:-1]):
            if n > sequence[i + 1]:
                _swap(sequence, i, i + 1)
                break
        else:
            break

    return sequence

def _swap(sequence: list, index_1: int, index_2: int) -> list:
    sequence[index_1], sequence[index_2] = sequence[index_2], sequence[index_1]


assert bubble_sort([5, 1, 3, 2, 4]) == [1, 2, 3, 4, 5]

In [None]:
# Problem 2 - Find second largest
# def find_second_largest(sequence):
#     # Write your algorithm with O(n) time complexity here.
#     return second_largest

# assert find_second_largest([3, 3, 2, 1]) == 2
# assert find_second_largest([3, 3, 3, 3, 3, 2, 2, 1]) == 2
# assert find_second_largest([-1, 2, 3, 5, 3, 1, 2, 4]) == 4

###############################################################

"""
Answer to Problem 2 - Find second largest

The requirement does not determine the returned value when following
additional issues happen, so I assume that the returned value is None
in such condition:
  1. What if the sequence is empty or only contains one element?
  2. What if all values in sequence are the same?
"""

def find_second_largest(sequence):
    # Write your algorithm with O(n) time complexity here.
    if len(sequence) < 2:
        return None

    largest, second_largest = sequence[0], sequence[0]

    for n in sequence[1:]:
        if n > largest:
            largest, second_largest = n, largest
        elif n > second_largest and n < largest:
            second_largest = n
        elif largest == second_largest:
            second_largest = n

    return second_largest if second_largest != largest else None

assert find_second_largest([3, 3, 2, 1]) == 2
assert find_second_largest([3, 3, 3, 3, 3, 2, 2, 1]) == 2
assert find_second_largest([-1, 2, 3, 5, 3, 1, 2, 4]) == 4
# Additional testing
assert find_second_largest([]) == None
assert find_second_largest([3]) == None
assert find_second_largest([3, 3, 3, 3]) == None



In [29]:
# Problem 3 - Inheritance
# Write some examples with inheritance code here.

###############################################################

"""
Answer to # Problem 3 - Inheritance

2 examples to demo concept of inheritance:
    1. Simple factory
    2. Override method for GCP credentials customization on Prefect
        (This example can not be executed directly)

       In this scenario, I need to interact with GoogleSheets using Prefect,
       an open-source orchestration framework. But the built-in `GcpCredentials`
       block class does not support required scopes for GoogleSheets API access.

       To solve it, I created a subclass `GcpCredentialsWithDriveScope`
       and overrode the method `get_credentials_from_service_account` to
       make it support additional scopes.
"""

###### Example 1. ######

# Parent class
class Animal:
    def speak(self):
        raise NotImplementedError("Subclass must implement this method")

# Inherit and implement Dog from Animal
class Dog(Animal):
    def speak(self):
        return "Woof!"

# Inherit and implement Cat from Animal
class Cat(Animal):
    def speak(self):
        return "Meow!"

# Factory function that returns an instance based on type
def animal_factory(animal_type):
    if animal_type == "dog":
        return Dog()
    elif animal_type == "cat":
        return Cat()
    else:
        raise ValueError(f"Unknown animal type: {animal_type}")

# Test the factory
animals = ["dog", "cat"]
for a in animals:
    animal = animal_factory(a)
    print(f"{a.capitalize()}: {animal.speak()}")


###### Example 2. ######

GCP_CREDENTIAL_SCOPE = [
    "https://www.googleapis.com/auth/cloud-platform",
    "https://www.googleapis.com/auth/drive",
    "https://www.googleapis.com/auth/bigquery",
]

class GcpCredentialsWithDriveScope(GcpCredentials):
    """
    GcpCredentialsWithDriveScope for dealing with GoogleSheets.

    reference:
        https://www.googlecloudcommunity.com/gc/Technical-Tips-Tricks/
        Access-Denied-BigQuery-BigQuery-Permission-denied-while-getting/ta-p/587743
    """

    def get_credentials_from_service_account(
        self: "GcpCredentialsWithDriveScope",
    ) -> Credentials:
        """
        Override get_credentials_from_service_account.

        Helper method to serialize credentials by using either
        service_account_file or service_account_info.
        """
        if self.service_account_info:
            credentials = Credentials.from_service_account_info(
                self.service_account_info.get_secret_value(),
                scopes=GCP_CREDENTIAL_SCOPE,
            )
        elif self.service_account_file:
            credentials = Credentials.from_service_account_file(
                self.service_account_file,
                scopes=GCP_CREDENTIAL_SCOPE,
            )
        else:
            credentials, _ = google.auth.default()

        return credentials


Dog: Woof!
Cat: Meow!


In [None]:
# Problem 4 - *args, **kwargs
# Write some examples with *args, **kwargs here.

###############################################################

"""
Answer to # Problem 4 - *args, **kwargs

2 examples to demo *args & **kwargs:
    1. How *args & **kwargs work
        *args: Possitional arguments, like a list.
        **kwargs: Keyword arguments, like a dict.
    2. Define a MySQL connection function
        (This example may not be executed directly without packages installed)
"""

###### Example 1. ######

def print_args_and_kwargs(*args, **kwargs):
    print("Positional arguments (*args):")
    for i, arg in enumerate(args):
        print(f"  arg[{i}] = {arg}")

    print("Keyword arguments (**kwargs):")
    for key, value in kwargs.items():
        print(f"  {key} = {value}")

# Example usage
print_args_and_kwargs(
    "apple", "banana", 42,
    name="Allen", job="Data Engineer", location="Taiwan",
)

print("==============")

print_args_and_kwargs(
    *["apple", "banana", 42],
    **{"name": "Allen", "job": "Data Engineer", "location": "Taiwan"},
)


###### Example 2. ######

from urllib.parse import quote_plus
from sqlalchemy import create_engine
from sqlalchemy.engine import Connection

def connect_to_mysql(*args: str, **kwargs: dict) -> Connection:
    """
    Connect to MySQL using positional and keyword arguments.

    Args:
    - *args (str): Positional arguments expected in this order:
        1. username (str)
        2. password (str)
        3. server (str)
        4. db_name (str)
    - **kwargs (dict): Additional keyword arguments to pass to the `connect_args`
        parameter of SQLAlchemy's `create_engine` function such as SSL options
        and timeouts.

    Returns:
    - Connection: An active connection to the specified MySQL database.

    Example:
    ```python
    connection = connect_to_mysql(
        "my_user",
        "my_password",
        "localhost",
        "my_database",
        ssl={"fake_ssl": True}
    )
    ```
    """
    if len(args) != 4:
        raise ValueError(
            "Expected 4 positional arguments: username, password, server, db_name",
        )

    username = quote_plus(args[0])
    password = quote_plus(args[1])
    server = args[2]
    db_name = args[3]

    return create_engine(
        f"mysql+pymysql://{username}:{password}@{server}/{db_name}",
        connect_args=kwargs,
    ).connect()


Positional arguments (*args):
  arg[0] = apple
  arg[1] = banana
  arg[2] = 42
Keyword arguments (**kwargs):
  name = Allen
  job = Data Engineer
  location = Taiwan
Positional arguments (*args):
  arg[0] = apple
  arg[1] = banana
  arg[2] = 42
Keyword arguments (**kwargs):
  name = Allen
  job = Data Engineer
  location = Taiwan


In [37]:
# Problem 5 - lambda
# Write some examples using python lambda here.

###############################################################

"""
Answer to # Problem 5 - Lambda

4 example to show how Lambda work:
    1. map()
    2. sorted()
    3. pandas apply()
        (This example may not be executed directly without packages installed)
    4. pyspark udf()
        (This example may not be executed directly without packages installed)
"""

###### Example 1. ######

nums = [1, 2, 3, 4]
squared = list(map(lambda x: x ** 2, nums))
print(squared)
print("==============")


###### Example 2. ######

people = [("Alice", 30), ("Bob", 25), ("Charlie", 35)]
# Sort by age (second element in tuple)
sorted_people = sorted(people, key=lambda x: x[1])

print(sorted_people)
print("==============")


###### Example 3. ######

import pandas as pd

df = pd.DataFrame({
    "name": ["Tom", "Jerry", "Spike"],
    "score": [60, 85, 70],
})
# Use lambda on apply() to get score result
df["result"] = df["score"].apply(lambda x: "pass" if x >= 70 else "fail")

print(df)
print("==============")


###### Example 4. ######

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType

spark = SparkSession.builder.master("local[*]").appName("LambdaExample").getOrCreate()

data = [("Tom", 60), ("Jerry", 85), ("Spike", 70)]
df_spark = spark.createDataFrame(data, ["name", "score"])

# Use lambda via UDF to get score result
grade_udf = udf(lambda x: "pass" if x >= 70 else "fail", StringType())
df_spark = df_spark.withColumn("result", grade_udf(col("score")))

df_spark.show()


[1, 4, 9, 16]
[('Bob', 25), ('Alice', 30), ('Charlie', 35)]
    name  score result
0    Tom     60   fail
1  Jerry     85   pass
2  Spike     70   pass


In [41]:
# Problem 6 - comprehension
# Write some examples using python comprehension here.

###############################################################

"""
Answer to # Problem 6 - Comprehension

2 examples to show how comprehension works:
    1. Parse raw string into a dictionary using dict comprehension
    2. Reverse a nested dictionary using dict comprehension
"""

###### Example 1. ######

payload_raw_str = """
q: google search
sca_esv: f4af6905b7ddecc7
sxsrf: AHTn8zrK9qT3xkCaIovpv_rSpRvm4edKwQ:1743737301251
ei: 1VHvZ-WAD8vq1e8PuNGpiQE
ved: 0ahUKEwilv7DTt72MAxVLdfUHHbhoKhEQ4dUDCBA
uact: 5
oq: google search
gs_lp: Egxnd3Mtd2l6LXNlcnAiDWdvb2dsZSBzZWFyY2gyChAAGLADGNYEGEcyChAAGLA
sclient: gws-wiz-serp
"""

# Generate payload dict using comprehension
payload_dict = {
    r.split(": ")[0]:r.split(": ")[1]
    for r in payload_raw_str.split("\n") if r.strip() != ""
}
print(payload_dict)


###### Example 2. ######

product_dict = {
    "book": [101, 102],
    "toy": [201, 202],
    "food": [301],
}

# Reverse the dict using comprehension
reversed_dict = {
    pid: ptype
    for ptype, pid_list in product_dict.items()
    for pid in pid_list
}
print(reversed_dict)


{'q': 'google search', 'sca_esv': 'f4af6905b7ddecc7', 'sxsrf': 'AHTn8zrK9qT3xkCaIovpv_rSpRvm4edKwQ:1743737301251', 'ei': '1VHvZ-WAD8vq1e8PuNGpiQE', 'ved': '0ahUKEwilv7DTt72MAxVLdfUHHbhoKhEQ4dUDCBA', 'uact': '5', 'oq': 'google search', 'gs_lp': 'Egxnd3Mtd2l6LXNlcnAiDWdvb2dsZSBzZWFyY2gyChAAGLADGNYEGEcyChAAGLA', 'sclient': 'gws-wiz-serp'}
{101: 'book', 102: 'book', 201: 'toy', 202: 'toy', 301: 'food'}


In [None]:
# Problem 7 - decorator
# Write some examples using python decorator here.

###############################################################

"""
Answer to # Problem 7 - decorator

3 examples of decorators:
    1. Deprecation decorator from third-party library
        (This example may not be executed directly without packages installed)
        Uses `@deprecated` to indicate that a function should no longer be used.
        This helps developers to maintain code quality.
    2. Airflow DAG and task decorators
        (This example can not be executed directly)
    3. Custom memoization decorator
        Implements caching using a decorator `@memoize` to avoid re-computation.
"""

###### Example 1. ######

from deprecated import deprecated

@deprecated(reason="Use utilities.database.connect_to_mssql instead.")
def connect_to_mssql(schema, con_engine='pymssql'):
    pass

connect_to_mssql("test")
print("==============")


###### Example 2. ######

import pendulum
from airflow.decorators import dag, task, bash_task

# Define the DAG
@dag(
    schedule="* 1-2,7-8 * * *",
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example"],
)
def d_04_example_dag_decorator():
    @task
    def task1():
        print("Running Task 1")
    @task
    def task2():
        print("Running Task 2")

    @bash_task
    def task3():
        return "echo 'Hello from Task 3!'"

    t1 = task1()
    task2().set_upstream(t1)
    task3().set_upstream(t1)

d_04_example_dag_decorator()


###### Example 3. ######

import random
import time

def memoize(func):
    cache = {}
    def wrapper(*args):

        if args not in cache:
            print(f'\tRunning {func.__name__} with {args}, and caching the result')
            cache[args] = func(*args)  # run the function, and cache the result
        else:
            print(f'\t{args} was cached; using that value')

        return cache[args]

    return wrapper

@memoize
def slow_add(first, second):
    time.sleep(random.randint(0, 3))
    return first + second

print(slow_add(2, 3))
print(slow_add(3, 4))
print(slow_add(2, 3))
print(slow_add(3, 4))
print("==============")


  connect_to_mssql("test")


	Running slow_add with (2, 3), and caching the result
5
	Running slow_add with (3, 4), and caching the result
7
	(2, 3) was cached; using that value
5
	(3, 4) was cached; using that value
7


In [None]:
# Problem 8 - generator
# Write some examples using python generator here.




# Explain the benefit of generators here.
#
#
#

###############################################################

"""
Answer to # Problem 8 - Generator

A `Generator` is implemented using keyword `yield`. It is an iterator without
storing all values in memory at once. Instead, it yields one value at a time when called.
So `Generator` is efficient for memory, especially when working with large dataset or
infinite sequences.

2 examples for large dataset and infinite sequences:
    1. [Large dataset] A generator to read large large files.
    2. [Infinite sequences] A generator to yield sequence of squares of numbers.
"""

###### Example 1. ######

def read_large_file(file_path):
    with open(file_path, "r") as file:
        for line in file:
            yield line.strip()


###### Example 2. ######

def square_generator(n):
    for i in range(n):
        yield i ** 2

for value in square_generator(5):
    print(value)



In [None]:
# Problem 9 - context manager
# Write some examples using python context manager here.

###############################################################

"""
Answer to # Problem 9 - Context Manager

2 examples for how it works and how it is implemented
    1. The open() is an implementation of context manager which can be closed
        in safe with keyword `with`.
        (This example can not be executed directly)
    2. Implement context manager using `__enter__` and `__exit__` methods.
        (This example can not be executed directly)

        By default, each BigQuery query runs in a separate session. But some operations,
        such as CTEs, require a shared session.

        The context manager ensures that:
        1. A new BigQuery session will be created when entering the context.
        2. All queries inside the `with` block run under the same session.
        3. The session is safely closed at the end, even if an error occurs.

        This helps prevent resource locking and ensures cleanup in case of failure.
"""

###### Example 1. ######

with open("sample.txt", "r") as f:
    content = f.read()
    print("File content loaded.")


###### Example 2. ######

from types import TracebackType
from google.cloud.bigquery import Client as BigQueryClient, QueryJobConfig, ConnectionProperty

class BigquerySession:
    """ContextManager wrapping a bigquerySession."""

    def __init__(self: "BigquerySession", bqclient: BigQueryClient) -> None:
        """Construct instance."""
        self._bigquery_client = bqclient
        self._session_id = None

    def __enter__(self: "BigquerySession") -> str:
        """Initiate a Bigquery session and return the session_id."""
        job = self._bigquery_client.query(
            "SELECT 1",  # used to create a session
            job_config=QueryJobConfig(create_session=True),
        )
        self._session_id = job.session_info.session_id
        job.result()  # wait job completion
        return self._session_id

    def __exit__(
        self: "BigquerySession",
        exc_type: type[BaseException] | None,
        exc_value: BaseException | None,
        traceback: TracebackType | None,
    ) -> None:
        """Abort the opened session."""
        if self._session_id:
            # abort the session in any case to have a clean state at the end
            # (sometimes in case of script failure, the table is locked in
            # the session)
            job = self._bigquery_client.query(
                "CALL BQ.ABORT_SESSION()",
                job_config=QueryJobConfig(
                    create_session=False,
                    connection_properties=[
                        ConnectionProperty(
                            key="session_id", value=self._session_id,
                        ),
                    ],
                ),
            )
            job.result()

In [47]:
# Problem 10 - magic methods
# Write some examples using python magic methods here.

###############################################################

"""
Answer to # Problem 10 - Magic Methods

Special methods in Python that start and end with double underscores.
They allow objects to implement behavior for built-in operations like:
    - `__str__`: Used to print a human-readable object
    - `__repr__`: Can be directly shown when debugging, for example in Jupyter notebook,
                    we can simply execute the variable to see what it is
    - `__len__`: length of the object, which is used by `len()`
    - `__eq__`: equality comparison, which is used by `==`
"""

class Product:
    def __init__(self, name: str, price: float, tags: list[str]) -> None:
        self.name = name
        self.price = price
        self.tags = tags

    def __str__(self) -> str:
        """Human-readable string, used by print()."""
        return f"{self.name} (${self.price})"

    def __repr__(self) -> str:
        """Developer-friendly string, used in interactive shell."""
        return f"Product(name={self.name!r}, price={self.price}, tags={self.tags})"

    def __len__(self) -> int:
        """Number of tags on the product."""
        return len(self.tags)

    def __eq__(self, other: object) -> bool:
        """Compare products by name and price."""
        if not isinstance(other, Product):
            return NotImplemented
        return self.name == other.name and self.price == other.price

p1 = Product("T-shirt", 19.99, ["clothing", "casual"])
p2 = Product("T-shirt", 19.99, ["sale", "summer"])
p3 = Product("Sneakers", 49.99, ["shoes"])


In [50]:
# __str__()
print(p1)
print(p2)
print(p3)

T-shirt ($19.99)
T-shirt ($19.99)
Sneakers ($49.99)


In [51]:
# __repr__()
p1

Product(name='T-shirt', price=19.99, tags=['clothing', 'casual'])

In [52]:
# __repr__()
p2

Product(name='T-shirt', price=19.99, tags=['sale', 'summer'])

In [53]:
# __repr__()
p3

Product(name='Sneakers', price=49.99, tags=['shoes'])

In [57]:
# __len__()
print(len(p1))
print(len(p2))
print(len(p3))

2
2
1


In [58]:
# __eq__()
print(p1 == p2)
print(p2 == p3)

True
False


25/04/04 16:25:05 WARN HeartbeatReceiver: Removing executor driver with no recent heartbeats: 185839 ms exceeds timeout 120000 ms
25/04/04 16:25:05 WARN SparkContext: Killing executors is not supported by current scheduler.
25/04/04 16:25:09 WARN Executor: Issue communicating with driver in heartbeater
org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:101)
	at org.apache.spark.rpc.RpcEndpointRef.askSync(RpcEndpointRef.scala:85)
	at org.apache.spark.storage.BlockManagerMaster.registerBlockManager(BlockManagerMaster.scala:80)
	at org.apache.spark.storage.BlockManager.reregister(BlockManager.scala:642)
	at org.apache.spark.executor.Executor.reportHeartBeat(Executor.scala:1223)
	at o