In [1]:
text = """
You can define background tasks to be run after returning a response.

This is useful for operations that need to happen after a request, but that the client doesn't really have to be waiting for the operation to complete before receiving the response.

This includes, for example:

Email notifications sent after performing an action:
As connecting to an email server and sending an email tends to be "slow" (several seconds), you can return the response right away and send the email notification in the background.
Processing data:
For example, let's say you receive a file that must go through a slow process, you can return a response of "Accepted" (HTTP 202) and process it in the background.
Using BackgroundTasks
First, import BackgroundTasks and define a parameter in your path operation function with a type declaration of BackgroundTasks:


from fastapi import BackgroundTasks, FastAPI

app = FastAPI()


def write_notification(email: str, message=""):
    with open("log.txt", mode="w") as email_file:
        content = f"notification for {email}: {message}"
        email_file.write(content)


@app.post("/send-notification/{email}")
async def send_notification(email: str, background_tasks: BackgroundTasks):
    background_tasks.add_task(write_notification, email, message="some notification")
    return {"message": "Notification sent in the background"}
FastAPI will create the object of type BackgroundTasks for you and pass it as that parameter.

Create a task function
Create a function to be run as the background task.

It is just a standard function that can receive parameters.

It can be an async def or normal def function, FastAPI will know how to handle it correctly.

In this case, the task function will write to a file (simulating sending an email).

And as the write operation doesn't use async and await, we define the function with normal def:


from fastapi import BackgroundTasks, FastAPI

app = FastAPI()


def write_notification(email: str, message=""):
    with open("log.txt", mode="w") as email_file:
        content = f"notification for {email}: {message}"
        email_file.write(content)


@app.post("/send-notification/{email}")
async def send_notification(email: str, background_tasks: BackgroundTasks):
    background_tasks.add_task(write_notification, email, message="some notification")
    return {"message": "Notification sent in the background"}
Add the background task
Inside of your path operation function, pass your task function to the background tasks object with the method .add_task():


from fastapi import BackgroundTasks, FastAPI

app = FastAPI()


def write_notification(email: str, message=""):
    with open("log.txt", mode="w") as email_file:
        content = f"notification for {email}: {message}"
        email_file.write(content)


@app.post("/send-notification/{email}")
async def send_notification(email: str, background_tasks: BackgroundTasks):
    background_tasks.add_task(write_notification, email, message="some notification")
    return {"message": "Notification sent in the background"}
.add_task() receives as arguments:

A task function to be run in the background (write_notification).
Any sequence of arguments that should be passed to the task function in order (email).
Any keyword arguments that should be passed to the task function (message="some notification").
Dependency Injection
Using BackgroundTasks also works with the dependency injection system, you can declare a parameter of type BackgroundTasks at multiple levels: in a path operation function, in a dependency (dependable), in a sub-dependency, etc.

FastAPI knows what to do in each case and how to re-use the same object, so that all the background tasks are merged together and are run in the background afterwards:


from typing import Optional

from fastapi import BackgroundTasks, Depends, FastAPI

app = FastAPI()


def write_log(message: str):
    with open("log.txt", mode="a") as log:
        log.write(message)


def get_query(background_tasks: BackgroundTasks, q: Optional[str] = None):
    if q:
        message = f"found query: {q}\n"
        background_tasks.add_task(write_log, message)
    return q


@app.post("/send-notification/{email}")
async def send_notification(
    email: str, background_tasks: BackgroundTasks, q: str = Depends(get_query)
):
    message = f"message to {email}\n"
    background_tasks.add_task(write_log, message)
    return {"message": "Message sent"}
In this example, the messages will be written to the log.txt file after the response is sent.

If there was a query in the request, it will be written to the log in a background task.

And then another background task generated at the path operation function will write a message using the email path parameter.

Technical Details
The class BackgroundTasks comes directly from starlette.background.

It is imported/included directly into FastAPI so that you can import it from fastapi and avoid accidentally importing the alternative BackgroundTask (without the s at the end) from starlette.background.

By only using BackgroundTasks (and not BackgroundTask), it's then possible to use it as a path operation function parameter and have FastAPI handle the rest for you, just like when using the Request object directly.

It's still possible to use BackgroundTask alone in FastAPI, but you have to create the object in your code and return a Starlette Response including it.

You can see more details in Starlette's official docs for Background Tasks.

Caveat
If you need to perform heavy background computation and you don't necessarily need it to be run by the same process (for example, you don't need to share memory, variables, etc), you might benefit from using other bigger tools like Celery.

They tend to require more complex configurations, a message/job queue manager, like RabbitMQ or Redis, but they allow you to run background tasks in multiple processes, and especially, in multiple servers.

To see an example, check the Project Generators, they all include Celery already configured.

But if you need to access variables and objects from the same FastAPI app, or you need to perform small background tasks (like sending an email notification), you can simply just use BackgroundTasks.

Recap
Import and use BackgroundTasks with parameters in path operation functions and dependencies to add background tasks.
"""

In [2]:
with open("mymodel.json", "w") as _file:
  _file.write(text)

In [3]:
! mv mymodel.json ./drive/My\ Drive

mv: cannot move 'mymodel.json' to './drive/My Drive': No such file or directory


In [4]:
import tensorflow as tf
from tensorflow.keras.applications.vgg16 import VGG16

In [5]:
model = VGG16()

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/vgg16/vgg16_weights_tf_dim_ordering_tf_kernels.h5


In [6]:
model.save("./vgg_model_folder")



In [7]:
# 2nd approach is
# i save the weight & Summary in sperate steps ...
model.save_weights("vgg_weights.h5")

In [8]:
model_json = model.to_json()
with open("vgg_model_json.json", "w") as _file:
  _file.write(model_json)

In [9]:
! cp vgg_weights.h5 ./drive/My\ Drive

cp: cannot create regular file './drive/My Drive': No such file or directory


In [10]:
! cp -r vgg_model_folder ./drive/My\ Drive

cp: cannot create directory './drive/My Drive': No such file or directory


In [11]:
# OPTIMIZERS IN DL

In [12]:
#1. HPO - Learning Rate
# Higher LR - We will miss the Global Minia or will miss the valley point
# Low    LR - It will take ages to reach the minima point

In [13]:
# LR is a super awesome thing to speed up & converge our training
# But at other hand it is dangerous also. ( Lower Vs Hihger Value)

In [14]:
# So we require something that can push the ball if stuck in valley - MOMENTUM

In [15]:
# VANILLA SGD => w1 = w1 - alpha(dE/dw1)
# Momentum SGD => w1 = w1 -alpha(dE/dw1 - momentum*dE/dw1( of previous run))
from tensorflow.keras.optimizers import SGD
SGD(learning_rate=0.01, momentum=0.3)

<keras.src.optimizers.sgd.SGD at 0x7e5d949ef9d0>

In [23]:
! pip install -q latexify-py
import latexify, math

In [17]:
#Adgrad (Adaptive Gradient)
# VANILLA SGD => w1 = w1 - alpha(dE/dw1)
#             => w2 = w2 - alpha(dE/dw2)
# One more problem with vanilla SGD is that they use the same learning-rate for
# all the parameters (weights & biases)

@latexify.with_latex
def w():
  return w - alpha*(dE/dw)
w

AttributeError: module 'latexify' has no attribute 'with_latex'

In [18]:
@latexify.with_latex
def w():
  return w - theta/math.sqrt((G + e))*(dE/dw)
w

AttributeError: module 'latexify' has no attribute 'with_latex'

In [19]:
# G        = Sum of squares of past gradients
# theta, e = Hyper parameters of adgrad
# theta - intial_accumulator_value
# the small (e) is actually to make sure that the denominator never becomes ZERO
# e     - epsilon

In [20]:
from tensorflow.keras.optimizers import Adagrad

In [21]:
Adagrad()

<keras.src.optimizers.adagrad.Adagrad at 0x7e5d94b70a30>

In [22]:
# Adadelta, RMSprop, Adam -> Trying to solve the problem of Adagrad
# Problem of Adagrad      -> At one point thw weight change is almost NIL because of the denominator (G) = sum of square of gradients

In [None]:
# Adadelta -> Take only some previous values and not all.... (some value - moving window concept)

In [None]:
# w- window of gradient to be taken in consideration

In [None]:
from tensorflow.keras.optimizers import Adadelta

In [None]:
Adadelta()

<tensorflow.python.keras.optimizer_v2.adadelta.Adadelta at 0x7f9223da37b8>

In [None]:
from tensorflow.keras.optimizers import RMSprop

In [None]:
RMSprop()

<tensorflow.python.keras.optimizer_v2.rmsprop.RMSprop at 0x7f9223d89dd8>

In [None]:
# DONT FOLLOW THIS _ DONT READ THIS
#1. SGD Vs  Momentum - Use them for the first iteration (Momentum generally gives me a good speed)
#2. RMSProp > Adam > Adadelta
#3. Adagrad